From 739325365eba5ff0fdcde97ee937491a17673293 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Tue, 9 Jun 2026 16:00:48 -0700 Subject: [PATCH 1/8] fix(sqlite): reconstruct LargeUtf8 and Utf8View on read-back --- src/sqlite_provider.rs | 40 ++++++++++++++++- tests/sqlite_provider_test.rs | 82 ++++++++++++++++++++++++++++++++++- 2 files changed, 120 insertions(+), 2 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 1c453ca..dae4b60 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -17,7 +17,9 @@ use std::any::Any; use std::fmt; use std::sync::{Arc, Mutex}; -use arrow_array::builder::{Int32Builder, Int64Builder, ListBuilder, StringBuilder}; +use arrow_array::builder::{ + Int32Builder, Int64Builder, LargeStringBuilder, ListBuilder, StringBuilder, StringViewBuilder, +}; use arrow_array::{ Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, UInt32Array, UInt64Array, @@ -1007,6 +1009,14 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { .value(row); SqlValue::Text(v.to_string()) } + DataType::Utf8View => { + let v = col + .as_any() + .downcast_ref::() + .unwrap() + .value(row); + SqlValue::Text(v.to_string()) + } DataType::Int32 => SqlValue::Integer( col.as_any() .downcast_ref::() @@ -1085,6 +1095,14 @@ fn serialize_list(col: &ArrayRef, row: usize) -> String { .value(i); JV::String(s.to_string()) } + DataType::Utf8View => { + let s = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::String(s.to_string()) + } DataType::Int64 => { let v = list_val .as_any() @@ -1161,6 +1179,26 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + let mut b = LargeStringBuilder::with_capacity(values.len(), values.len() * 32); + for v in &values { + match v { + SqlValue::Text(s) => b.append_value(s), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::Utf8View => { + let mut b = StringViewBuilder::with_capacity(values.len()); + for v in &values { + match v { + SqlValue::Text(s) => b.append_value(s), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } DataType::List(item_field) => match item_field.data_type() { DataType::Utf8 | DataType::LargeUtf8 => { let mut b = diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index ee85449..d843b14 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -2,7 +2,9 @@ use std::sync::Arc; -use arrow_array::{Array, Int64Array, RecordBatch, StringArray, UInt64Array}; +use arrow_array::{ + Array, Int64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, UInt64Array, +}; use arrow_schema::{DataType, Field, Schema}; use datafusion::catalog::TableProvider; use datafusion::prelude::SessionContext; @@ -294,6 +296,84 @@ fn test_stream_builder_validation_errors() { assert!(b_text.push_batch(&text_batch).is_err()); } +/// Regression test for hotdata-dev/runtimedb#631: payload columns typed as +/// `LargeUtf8` (what DuckDB/parquet readers emit for strings) or `Utf8View` +/// (what a `::text` cast produces) must round-trip through the sidecar. The +/// write side already mapped both to TEXT, but the read-back path was missing +/// the reconstruction arms and failed with "unsupported Arrow type LargeUtf8". +#[tokio::test] +async fn test_string_view_and_large_utf8_roundtrip() { + let dir = tempdir().unwrap(); + + // key + a LargeUtf8 column + a Utf8View column. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("large", DataType::LargeUtf8, true), + Field::new("view", DataType::Utf8View, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![10_i64, 20, 30])), + Arc::new(LargeStringArray::from(vec![ + Some("alpha"), + None, + Some("gamma"), + ])), + Arc::new(StringViewArray::from(vec![Some("one"), Some("two"), None])), + ], + ) + .unwrap(); + + let db_path = dir.path().join("strings.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, // key (rowid) is column 0 + vec![1, 2], // columns 1 (large) and 2 (view) are stored values + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider + .fetch_by_keys(&[10, 30], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // The reconstructed columns must preserve their original Arrow types and values. + let mut large: Vec> = Vec::new(); + let mut view: Vec> = Vec::new(); + for b in &batches { + let l = b + .column_by_name("large") + .unwrap() + .as_any() + .downcast_ref::() + .expect("large column should reconstruct as LargeStringArray"); + let v = b + .column_by_name("view") + .unwrap() + .as_any() + .downcast_ref::() + .expect("view column should reconstruct as StringViewArray"); + for i in 0..b.num_rows() { + large.push(l.is_valid(i).then(|| l.value(i).to_string())); + view.push(v.is_valid(i).then(|| v.value(i).to_string())); + } + } + + assert!(large.contains(&Some("alpha".to_string()))); + assert!(large.contains(&Some("gamma".to_string()))); + assert!(view.contains(&Some("one".to_string()))); + // rowid 30 had a null view value, which must survive the round-trip. + assert!(view.contains(&None)); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From ef8ee546798c128c328e98d9926a4c64ec005ac6 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Tue, 9 Jun 2026 16:09:26 -0700 Subject: [PATCH 2/8] fix(sqlite): reconstruct List on read-back --- src/sqlite_provider.rs | 18 +++++++++++ tests/sqlite_provider_test.rs | 59 +++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index dae4b60..a8d2fe4 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -1218,6 +1218,24 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + let mut b = ListBuilder::new(StringViewBuilder::new()) + .with_field(item_field.as_ref().clone()); + for v in &values { + match v { + SqlValue::Text(s) => { + let items: Vec> = + serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) + } DataType::Int64 => { let mut b = ListBuilder::new(Int64Builder::new()).with_field(item_field.as_ref().clone()); diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index d843b14..e36b1f0 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -374,6 +374,65 @@ async fn test_string_view_and_large_utf8_roundtrip() { assert!(view.contains(&None)); } +/// Companion to the scalar regression above: a `List` payload must +/// also round-trip. The write side serializes list elements to JSON TEXT, so a +/// missing `Utf8View` reconstruction arm would write real values then fail on +/// read-back with "unsupported list item type Utf8View". +#[tokio::test] +async fn test_list_utf8view_roundtrip() { + use arrow_array::ListArray; + use arrow_array::builder::{ListBuilder, StringViewBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::Utf8View, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::List(item_field.clone()), true), + ])); + + // Two rows, each a list of Utf8View strings (including a null element). + let mut lb = ListBuilder::new(StringViewBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); + lb.values().append_value("blue"); + lb.values().append_value("green"); + lb.append(true); + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1_i64, 2])), Arc::new(tags)], + ) + .unwrap(); + + let db_path = dir.path().join("lists.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); + + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a List"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as StringViewArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From 9e7fff25b62b2d8698f892d18605360b565ac8bf Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Tue, 9 Jun 2026 16:13:26 -0700 Subject: [PATCH 3/8] fix(sqlite): use LargeStringBuilder for List read-back --- src/sqlite_provider.rs | 20 ++++++++++++- tests/sqlite_provider_test.rs | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index a8d2fe4..7db2af0 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -1200,7 +1200,7 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult match item_field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => { + DataType::Utf8 => { let mut b = ListBuilder::new(StringBuilder::new()).with_field(item_field.as_ref().clone()); for v in &values { @@ -1218,6 +1218,24 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + let mut b = ListBuilder::new(LargeStringBuilder::new()) + .with_field(item_field.as_ref().clone()); + for v in &values { + match v { + SqlValue::Text(s) => { + let items: Vec> = + serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) + } DataType::Utf8View => { let mut b = ListBuilder::new(StringViewBuilder::new()) .with_field(item_field.as_ref().clone()); diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index e36b1f0..46b27d1 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -433,6 +433,59 @@ async fn test_list_utf8view_roundtrip() { assert!(strs.is_null(1)); } +/// A `List` payload must reconstruct with a `LargeStringArray` child. +/// Arrow's `ListBuilder::finish()` panics if the declared item field type does +/// not match the child builder, so `LargeUtf8` lists must not be routed through +/// a `Utf8` `StringBuilder`. +#[tokio::test] +async fn test_list_largeutf8_roundtrip() { + use arrow_array::ListArray; + use arrow_array::builder::{LargeStringBuilder, ListBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::LargeUtf8, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::List(item_field.clone()), true), + ])); + + let mut lb = ListBuilder::new(LargeStringBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1_i64])), Arc::new(tags)], + ) + .unwrap(); + + let db_path = dir.path().join("large_lists.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a List"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as LargeStringArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From c1802e31200ad448abf87b6bbc457ea9f756c67f Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Tue, 9 Jun 2026 16:27:27 -0700 Subject: [PATCH 4/8] fix(sqlite): reconstruct LargeList payloads on read-back --- src/sqlite_provider.rs | 145 +++++++++++----------------------- tests/sqlite_provider_test.rs | 52 ++++++++++++ 2 files changed, 98 insertions(+), 99 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 7db2af0..9d453a0 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -18,13 +18,14 @@ use std::fmt; use std::sync::{Arc, Mutex}; use arrow_array::builder::{ - Int32Builder, Int64Builder, LargeStringBuilder, ListBuilder, StringBuilder, StringViewBuilder, + GenericListBuilder, Int32Builder, Int64Builder, LargeStringBuilder, StringBuilder, + StringViewBuilder, }; use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, - UInt32Array, UInt64Array, + Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, OffsetSizeTrait, + RecordBatch, StringArray, UInt32Array, UInt64Array, }; -use arrow_schema::{DataType, SchemaRef}; +use arrow_schema::{DataType, FieldRef, SchemaRef}; use async_trait::async_trait; use datafusion::catalog::{Session, TableProvider}; use datafusion::common::Result as DFResult; @@ -1127,6 +1128,45 @@ fn serialize_list(col: &ArrayRef, row: usize) -> String { serde_json::to_string(&items).unwrap_or_else(|_| "[]".to_string()) } +/// Reconstruct a `GenericListArray` (regular `List` when `O = i32`, +/// `LargeList` when `O = i64`) from the JSON TEXT that `serialize_list` writes. +/// The serialized form is identical for both offset widths, so the only +/// difference is the outer builder's offset type — hence the generic. +fn json_text_to_list( + item_field: &FieldRef, + values: &[SqlValue], +) -> DFResult { + macro_rules! build_list { + ($child:expr, $item:ty) => {{ + let mut b = + GenericListBuilder::::new($child).with_field(item_field.as_ref().clone()); + for v in values { + match v { + SqlValue::Text(s) => { + let items: Vec> = serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) as ArrayRef + }}; + } + Ok(match item_field.data_type() { + DataType::Utf8 => build_list!(StringBuilder::new(), String), + DataType::LargeUtf8 => build_list!(LargeStringBuilder::new(), String), + DataType::Utf8View => build_list!(StringViewBuilder::new(), String), + DataType::Int64 => build_list!(Int64Builder::new(), i64), + DataType::Int32 => build_list!(Int32Builder::new(), i32), + inner => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: unsupported list item type {inner:?}" + )))?, + }) +} + fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { Ok(match dt { DataType::UInt64 => { @@ -1199,101 +1239,8 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult match item_field.data_type() { - DataType::Utf8 => { - let mut b = - ListBuilder::new(StringBuilder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } - } - Arc::new(b.finish()) - } - DataType::LargeUtf8 => { - let mut b = ListBuilder::new(LargeStringBuilder::new()) - .with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } - } - Arc::new(b.finish()) - } - DataType::Utf8View => { - let mut b = ListBuilder::new(StringViewBuilder::new()) - .with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } - } - Arc::new(b.finish()) - } - DataType::Int64 => { - let mut b = - ListBuilder::new(Int64Builder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } - } - Arc::new(b.finish()) - } - DataType::Int32 => { - let mut b = - ListBuilder::new(Int32Builder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } - } - Arc::new(b.finish()) - } - inner => Err(DataFusionError::Execution(format!( - "SqliteLookupProvider: unsupported list item type {inner:?}" - )))?, - }, + DataType::List(item_field) => json_text_to_list::(item_field, &values)?, + DataType::LargeList(item_field) => json_text_to_list::(item_field, &values)?, DataType::Float64 => { let arr: Float64Array = values .iter() diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index 46b27d1..6409414 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -486,6 +486,58 @@ async fn test_list_largeutf8_roundtrip() { assert!(strs.is_null(1)); } +/// A `LargeList` payload must round-trip. The write side serializes both +/// `List` and `LargeList` to JSON TEXT, but the read-back path only reconstructed +/// `List`, so a `LargeList` column failed with "unsupported Arrow type LargeList". +#[tokio::test] +async fn test_largelist_roundtrip() { + use arrow_array::LargeListArray; + use arrow_array::builder::{LargeListBuilder, StringBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::LargeList(item_field.clone()), true), + ])); + + let mut lb = LargeListBuilder::new(StringBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1_i64])), Arc::new(tags)], + ) + .unwrap(); + + let db_path = dir.path().join("largelist.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a LargeList"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as StringArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); From d1cd4094897a3e7d0583b3f5eb6b3b972b6c0922 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Tue, 9 Jun 2026 16:30:35 -0700 Subject: [PATCH 5/8] test(sqlite): cover null and empty list cells on read-back --- tests/sqlite_provider_test.rs | 53 ++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index 6409414..e46a247 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -391,19 +391,25 @@ async fn test_list_utf8view_roundtrip() { Field::new("tags", DataType::List(item_field.clone()), true), ])); - // Two rows, each a list of Utf8View strings (including a null element). + // Four rows exercising: a populated list with a null element, another + // populated list, a NULL list cell, and an empty list. let mut lb = ListBuilder::new(StringViewBuilder::new()).with_field(item_field); lb.values().append_value("red"); lb.values().append_null(); - lb.append(true); + lb.append(true); // rowid 1: ["red", null] lb.values().append_value("blue"); lb.values().append_value("green"); - lb.append(true); + lb.append(true); // rowid 2: ["blue", "green"] + lb.append(false); // rowid 3: NULL list + lb.append(true); // rowid 4: [] (empty list) let tags = lb.finish(); let batch = RecordBatch::try_new( schema.clone(), - vec![Arc::new(Int64Array::from(vec![1_i64, 2])), Arc::new(tags)], + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3, 4])), + Arc::new(tags), + ], ) .unwrap(); @@ -431,6 +437,45 @@ async fn test_list_utf8view_roundtrip() { assert_eq!(strs.len(), 2); assert_eq!(strs.value(0), "red"); assert!(strs.is_null(1)); + + // A NULL list cell survives as a null list entry; an empty list survives + // as a present-but-empty list (length 0), not as null. + let nullable = provider + .fetch_by_keys(&[3, 4], "rowid", None) + .await + .unwrap(); + let mut null_lists = 0usize; + let mut empty_lists = 0usize; + for b in &nullable { + let rowid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let list = b + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..b.num_rows() { + match rowid.value(i) { + 3 => { + assert!(list.is_null(i), "rowid 3 should be a NULL list"); + null_lists += 1; + } + 4 => { + assert!(list.is_valid(i), "rowid 4 should be a present empty list"); + assert_eq!(list.value(i).len(), 0, "rowid 4 should be empty"); + empty_lists += 1; + } + other => panic!("unexpected rowid {other}"), + } + } + } + assert_eq!(null_lists, 1); + assert_eq!(empty_lists, 1); } /// A `List` payload must reconstruct with a `LargeStringArray` child. From dd2100c5bb871c1c2013daf54453e9e18ae4f243 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 10 Jun 2026 12:47:38 +0530 Subject: [PATCH 6/8] feat(sqlite): validate payload types at build time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #27 closed the reachable LargeUtf8/Utf8View/list read-back gaps, but the underlying defect class remained: the write side accepted any Arrow type (silent TEXT/NULL fallback) while the read side reconstructed only a subset, so an unsupported payload column built a "successful" sidecar that then failed on the first query (runtimedb#631). Close the class structurally: - Add `payload_type_supported` as the single source of truth for which Arrow types round-trip, and `validate_payload_schema` to gate every build entry point (`SqliteSidecarBuilder::begin`, `open_or_build`). An unsupported column now fails at index-build time with the column named, instead of at query time — which is where runtimedb's index-create step will surface it. - Widen actual support to the common parquet/DuckDB scalar types that previously fell through: Boolean, Date32, Date64, and Timestamp (all units, time zone preserved on read-back). Tests: reject a Decimal128 payload at build time; round-trip Boolean, Date32, and a tz-carrying Timestamp. --- src/sqlite_provider.rs | 184 +++++++++++++++++++++++++++++++++- tests/sqlite_provider_test.rs | 134 +++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 3 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 9d453a0..2ac99b7 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -25,7 +25,7 @@ use arrow_array::{ Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, OffsetSizeTrait, RecordBatch, StringArray, UInt32Array, UInt64Array, }; -use arrow_schema::{DataType, FieldRef, SchemaRef}; +use arrow_schema::{DataType, FieldRef, SchemaRef, TimeUnit}; use async_trait::async_trait; use datafusion::catalog::{Session, TableProvider}; use datafusion::common::Result as DFResult; @@ -213,6 +213,7 @@ impl SqliteLookupProvider { "pool_size must be at least 1".into(), )); } + validate_payload_schema(&schema, 0)?; let conn = open_conn(db_path)?; let table_exists: bool = conn @@ -336,6 +337,7 @@ impl SqliteSidecarBuilder { value_col_indices.len() ))); } + validate_payload_schema(&schema, key_col_index)?; let (create_sql, insert_sql) = ddl(table_name, &schema); let conn = open_conn(db_path)?; // Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the @@ -981,11 +983,85 @@ fn build_table( // ── Type conversion helpers ─────────────────────────────────────────────────── +/// Single source of truth for which Arrow types this provider can store in the +/// sidecar **and** reconstruct on read-back. Every type that returns `true` here +/// must have a matching arm in all of `arrow_cell_to_sql` (write), `arrow_type_to_sql` +/// (column affinity), and `sql_values_to_arrow` (read). `validate_payload_schema` +/// gates every build against this set so an unsupported column fails at index +/// build time — not later at query time, half a sidecar in. +fn payload_type_supported(dt: &DataType) -> bool { + match dt { + DataType::Boolean + | DataType::Int32 + | DataType::Int64 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) => true, + DataType::List(item) | DataType::LargeList(item) => { + list_item_type_supported(item.data_type()) + } + _ => false, + } +} + +/// Item types reconstructable by `json_text_to_list`. Lists are serialized to +/// JSON TEXT, so only types with a JSON representation `serialize_list` writes +/// and `json_text_to_list` reads are supported. +fn list_item_type_supported(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Int32 + | DataType::Int64 + ) +} + +/// Reject a build whose payload columns include a type the read-back path cannot +/// reconstruct. Called from every build entry point so the failure surfaces at +/// index-creation time with the offending column named, rather than as a +/// `unsupported Arrow type` error on the first query against a sidecar that +/// "built successfully". `key_col_index` is skipped — keys are validated +/// separately by `extract_key`. +fn validate_payload_schema(schema: &SchemaRef, key_col_index: usize) -> DFResult<()> { + for (i, field) in schema.fields().iter().enumerate() { + if i == key_col_index { + continue; + } + if !payload_type_supported(field.data_type()) { + return Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: payload column '{}' has unsupported type {:?}. \ + Supported types: Boolean, Int32/64, UInt32/64, Float32/64, \ + Utf8/LargeUtf8/Utf8View, Date32/64, Timestamp, and List/LargeList \ + of {{Utf8, LargeUtf8, Utf8View, Int32, Int64}}.", + field.name(), + field.data_type() + ))); + } + } + Ok(()) +} + fn arrow_type_to_sql(dt: &DataType) -> &'static str { match dt { - DataType::UInt64 | DataType::UInt32 | DataType::Int32 | DataType::Int64 => "INTEGER", + DataType::UInt64 + | DataType::UInt32 + | DataType::Int32 + | DataType::Int64 + | DataType::Boolean + | DataType::Date32 + | DataType::Date64 + | DataType::Timestamp(_, _) => "INTEGER", DataType::Float32 | DataType::Float64 => "REAL", - _ => "TEXT", // Utf8, LargeUtf8, List variants → TEXT (JSON for lists) + _ => "TEXT", // Utf8, LargeUtf8, Utf8View, List variants → TEXT (JSON for lists) } } @@ -1057,7 +1133,54 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { .unwrap() .value(row), ), + DataType::Boolean => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Date32 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Date64 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row), + ), + // Timestamps are stored as their raw i64 tick count; the unit and time + // zone live in the schema and are restored on read-back. + DataType::Timestamp(unit, _) => { + let v = match unit { + TimeUnit::Second => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Millisecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Microsecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Nanosecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + }; + SqlValue::Integer(v) + } DataType::List(_) | DataType::LargeList(_) => SqlValue::Text(serialize_list(col, row)), + // Unreachable for any schema that passed `validate_payload_schema`; kept + // as a defensive fallback rather than a panic. _ => SqlValue::Null, } } @@ -1239,6 +1362,61 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + let arr: arrow_array::BooleanArray = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i != 0), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Date32 => { + let arr: arrow_array::Date32Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i32), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Date64 => { + let arr: arrow_array::Date64Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }) + .collect(); + Arc::new(arr) + } + // Rebuild the unit-specific array, then restore the schema's time zone so + // the reconstructed column's DataType matches the original exactly. + DataType::Timestamp(unit, tz) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }); + match unit { + TimeUnit::Second => Arc::new( + arrow_array::TimestampSecondArray::from_iter(it).with_timezone_opt(tz.clone()), + ), + TimeUnit::Millisecond => Arc::new( + arrow_array::TimestampMillisecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + TimeUnit::Microsecond => Arc::new( + arrow_array::TimestampMicrosecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + TimeUnit::Nanosecond => Arc::new( + arrow_array::TimestampNanosecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + } + } DataType::List(item_field) => json_text_to_list::(item_field, &values)?, DataType::LargeList(item_field) => json_text_to_list::(item_field, &values)?, DataType::Float64 => { diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index e46a247..b7c16e1 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -781,3 +781,137 @@ async fn test_custom_key_column_name() { .unwrap(); assert_eq!(key_col.value(0), 1); } + +/// The defect class behind hotdata-dev/runtimedb#631 was that the write side +/// accepted any Arrow type (silent TEXT/NULL fallback) while the read side only +/// reconstructed a subset — so an unsupported payload column built a "successful" +/// sidecar that then exploded on the first query. This asserts the failure now +/// surfaces at *build* time (`begin`) with the offending column named, which is +/// where runtimedb's index-create step will catch it. +#[tokio::test] +async fn test_unsupported_payload_type_rejected_at_build() { + let dir = tempdir().unwrap(); + + // Decimal128 is a common parquet type the read-back path cannot reconstruct. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(10, 2), true), + ])); + + let db_path = dir.path().join("bad.db"); + let err = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .map(|_| ()) + .expect_err("a Decimal128 payload column must be rejected at build time"); + + let msg = err.to_string(); + assert!( + msg.contains("price") && msg.contains("unsupported type"), + "error should name the offending column and type, got: {msg}" + ); +} + +/// Round-trips the basic scalar types added alongside the validation gate +/// (Boolean, Date32, and a timezone-carrying Timestamp). These are exactly the +/// kinds of columns DuckDB/parquet emit that previously fell through to the +/// "unsupported Arrow type" error on read-back. +#[tokio::test] +async fn test_basic_scalar_types_roundtrip() { + use arrow_array::{BooleanArray, Date32Array, TimestampMicrosecondArray}; + use arrow_schema::TimeUnit; + + let dir = tempdir().unwrap(); + + let ts_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("flag", DataType::Boolean, true), + Field::new("day", DataType::Date32, true), + Field::new("ts", ts_type.clone(), true), + ])); + + let ts_array = + TimestampMicrosecondArray::from(vec![Some(1_000_000_i64), None, Some(3_000_000)]) + .with_timezone_opt(Some("UTC".to_string())); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3])), + Arc::new(BooleanArray::from(vec![Some(true), Some(false), None])), + Arc::new(Date32Array::from(vec![ + Some(19_000_i32), + None, + Some(19_002), + ])), + Arc::new(ts_array), + ], + ) + .unwrap(); + + let db_path = dir.path().join("basics.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, + vec![1, 2, 3], + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider + .fetch_by_keys(&[1, 3], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // The timestamp column must reconstruct with its original unit *and* time + // zone, otherwise the column's DataType won't match the declared schema. + for b in &batches { + assert_eq!(b.column_by_name("ts").unwrap().data_type(), &ts_type); + b.column_by_name("flag") + .unwrap() + .as_any() + .downcast_ref::() + .expect("flag should reconstruct as BooleanArray"); + b.column_by_name("day") + .unwrap() + .as_any() + .downcast_ref::() + .expect("day should reconstruct as Date32Array"); + } + + // Spot-check rowid 1's values survived the round-trip. + let rowid_one = batches + .iter() + .flat_map(|b| { + let rid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let flag = b + .column_by_name("flag") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let day = b + .column_by_name("day") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|&i| rid.value(i) == 1) + .map(|i| (flag.value(i), day.value(i))) + .collect::>() + }) + .next() + .expect("rowid 1 should be present"); + assert_eq!(rowid_one, (true, 19_000)); +} From 38763085ff4901e4e4d6db7d1f3b97d12865f3be Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 10 Jun 2026 12:53:53 +0530 Subject: [PATCH 7/8] feat(sqlite): support all basic scalar payload types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round out the supported set with the remaining primitive Arrow types so a parquet/DuckDB column of any basic type round-trips instead of being rejected at build time: - small integers: Int8, Int16, UInt8, UInt16 (→ INTEGER) - time-of-day: Time32 (Second/Millisecond), Time64 (Microsecond/Nanosecond) (→ INTEGER, unit restored from the schema on read-back) - binary: Binary, LargeBinary (→ BLOB) These add match arms only; each column hits a single arm, so there is no runtime cost for types a given sidecar doesn't use. Float16 (needs the `half` crate) and Decimal128/256 (need lossless text encoding) remain out of scope and are still rejected early by validate_payload_schema. Test: round-trip all eight new types with null cells, asserting both the reconstructed Arrow type and the values. --- src/sqlite_provider.rs | 188 +++++++++++++++++++++++++++++++- tests/sqlite_provider_test.rs | 197 ++++++++++++++++++++++++++++++++++ 2 files changed, 381 insertions(+), 4 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 2ac99b7..1b5e888 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -992,8 +992,12 @@ fn build_table( fn payload_type_supported(dt: &DataType) -> bool { match dt { DataType::Boolean + | DataType::Int8 + | DataType::Int16 | DataType::Int32 | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 | DataType::Float32 @@ -1001,8 +1005,12 @@ fn payload_type_supported(dt: &DataType) -> bool { | DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary | DataType::Date32 | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) | DataType::Timestamp(_, _) => true, DataType::List(item) | DataType::LargeList(item) => { list_item_type_supported(item.data_type()) @@ -1039,9 +1047,9 @@ fn validate_payload_schema(schema: &SchemaRef, key_col_index: usize) -> DFResult if !payload_type_supported(field.data_type()) { return Err(DataFusionError::Execution(format!( "SqliteLookupProvider: payload column '{}' has unsupported type {:?}. \ - Supported types: Boolean, Int32/64, UInt32/64, Float32/64, \ - Utf8/LargeUtf8/Utf8View, Date32/64, Timestamp, and List/LargeList \ - of {{Utf8, LargeUtf8, Utf8View, Int32, Int64}}.", + Supported types: Boolean, Int8/16/32/64, UInt8/16/32/64, Float32/64, \ + Utf8/LargeUtf8/Utf8View, Binary/LargeBinary, Date32/64, Time32/64, \ + Timestamp, and List/LargeList of {{Utf8, LargeUtf8, Utf8View, Int32, Int64}}.", field.name(), field.data_type() ))); @@ -1052,15 +1060,22 @@ fn validate_payload_schema(schema: &SchemaRef, key_col_index: usize) -> DFResult fn arrow_type_to_sql(dt: &DataType) -> &'static str { match dt { - DataType::UInt64 + DataType::UInt8 + | DataType::UInt16 | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Boolean | DataType::Date32 | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) | DataType::Timestamp(_, _) => "INTEGER", DataType::Float32 | DataType::Float64 => "REAL", + DataType::Binary | DataType::LargeBinary => "BLOB", _ => "TEXT", // Utf8, LargeUtf8, Utf8View, List variants → TEXT (JSON for lists) } } @@ -1178,6 +1193,79 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { }; SqlValue::Integer(v) } + DataType::Int8 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Int16 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::UInt8 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::UInt16 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + // Time-of-day is stored as its raw tick count; the unit is restored from + // the schema on read-back. Time32 only has Second/Millisecond units and + // Time64 only Microsecond/Nanosecond — other combinations are invalid. + DataType::Time32(unit) => { + let v = match unit { + TimeUnit::Second => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + TimeUnit::Millisecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + _ => return SqlValue::Null, + }; + SqlValue::Integer(v) + } + DataType::Time64(unit) => { + let v = match unit { + TimeUnit::Microsecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Nanosecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + _ => return SqlValue::Null, + }; + SqlValue::Integer(v) + } + DataType::Binary => SqlValue::Blob( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) + .to_vec(), + ), + DataType::LargeBinary => SqlValue::Blob( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) + .to_vec(), + ), DataType::List(_) | DataType::LargeList(_) => SqlValue::Text(serialize_list(col, row)), // Unreachable for any schema that passed `validate_payload_schema`; kept // as a defensive fallback rather than a panic. @@ -1417,6 +1505,98 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { + let arr: arrow_array::Int8Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i8), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Int16 => { + let arr: arrow_array::Int16Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i16), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::UInt8 => { + let arr: arrow_array::UInt8Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u8), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::UInt16 => { + let arr: arrow_array::UInt16Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u16), + _ => None, + }) + .collect(); + Arc::new(arr) + } + // Time32 is i32-backed (Second/Millisecond); Time64 is i64-backed + // (Microsecond/Nanosecond). Other unit pairings are invalid Arrow. + DataType::Time32(unit) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i as i32), + _ => None, + }); + match unit { + TimeUnit::Second => Arc::new(arrow_array::Time32SecondArray::from_iter(it)), + TimeUnit::Millisecond => { + Arc::new(arrow_array::Time32MillisecondArray::from_iter(it)) + } + other => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: invalid Time32 unit {other:?}" + )))?, + } + } + DataType::Time64(unit) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }); + match unit { + TimeUnit::Microsecond => { + Arc::new(arrow_array::Time64MicrosecondArray::from_iter(it)) + } + TimeUnit::Nanosecond => Arc::new(arrow_array::Time64NanosecondArray::from_iter(it)), + other => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: invalid Time64 unit {other:?}" + )))?, + } + } + DataType::Binary => { + let mut b = arrow_array::builder::BinaryBuilder::new(); + for v in &values { + match v { + SqlValue::Blob(bytes) => b.append_value(bytes), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::LargeBinary => { + let mut b = arrow_array::builder::LargeBinaryBuilder::new(); + for v in &values { + match v { + SqlValue::Blob(bytes) => b.append_value(bytes), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } DataType::List(item_field) => json_text_to_list::(item_field, &values)?, DataType::LargeList(item_field) => json_text_to_list::(item_field, &values)?, DataType::Float64 => { diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index b7c16e1..f4422ce 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -915,3 +915,200 @@ async fn test_basic_scalar_types_roundtrip() { .expect("rowid 1 should be present"); assert_eq!(rowid_one, (true, 19_000)); } + +/// Round-trips the remaining "basic" scalar types added in the same pass: +/// the small integers (Int8/16, UInt8/16), time-of-day (Time32/Time64), and +/// binary (Binary/LargeBinary). Each column must reconstruct with its original +/// Arrow type and values, including nulls. +#[tokio::test] +async fn test_all_basic_types_roundtrip() { + use arrow_array::{ + BinaryArray, Int8Array, Int16Array, LargeBinaryArray, Time32MillisecondArray, + Time64NanosecondArray, UInt8Array, UInt16Array, + }; + use arrow_schema::TimeUnit; + + let dir = tempdir().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("i8", DataType::Int8, true), + Field::new("i16", DataType::Int16, true), + Field::new("u8", DataType::UInt8, true), + Field::new("u16", DataType::UInt16, true), + Field::new("t32", DataType::Time32(TimeUnit::Millisecond), true), + Field::new("t64", DataType::Time64(TimeUnit::Nanosecond), true), + Field::new("bin", DataType::Binary, true), + Field::new("lbin", DataType::LargeBinary, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3])), + Arc::new(Int8Array::from(vec![Some(-8_i8), None, Some(127)])), + Arc::new(Int16Array::from(vec![Some(-16_i16), Some(300), None])), + Arc::new(UInt8Array::from(vec![Some(8_u8), None, Some(255)])), + Arc::new(UInt16Array::from(vec![Some(16_u16), Some(65535), None])), + Arc::new(Time32MillisecondArray::from(vec![ + Some(1_000_i32), + None, + Some(86_399_000), + ])), + Arc::new(Time64NanosecondArray::from(vec![ + Some(1_i64), + Some(2), + None, + ])), + Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"\x00\x01".as_ref()), + None, + Some(b"\xff".as_ref()), + ])), + Arc::new(LargeBinaryArray::from_opt_vec(vec![ + Some(b"abc".as_ref()), + Some(b"".as_ref()), + None, + ])), + ], + ) + .unwrap(); + + let db_path = dir.path().join("allbasic.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, + vec![1, 2, 3, 4, 5, 6, 7, 8], + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + // Fetch rows 1 and 3 and collect them keyed by rowid so assertions don't + // depend on row/batch ordering. + let batches = provider + .fetch_by_keys(&[1, 3], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // Every column must reconstruct with its declared Arrow type. + for b in &batches { + for f in b.schema().fields() { + assert_eq!( + b.column_by_name(f.name()).unwrap().data_type(), + f.data_type(), + "column {} reconstructed with wrong type", + f.name() + ); + } + } + + // Pull row 1's values out and check them concretely. + let (i8v, i16v, u8v, u16v, t32v, t64v, binv, lbinv) = batches + .iter() + .flat_map(|b| { + let rid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let i8a = b + .column_by_name("i8") + .unwrap() + .as_any() + .downcast_ref::() + .expect("i8 should reconstruct as Int8Array"); + let i16a = b + .column_by_name("i16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let u8a = b + .column_by_name("u8") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let u16a = b + .column_by_name("u16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let t32a = b + .column_by_name("t32") + .unwrap() + .as_any() + .downcast_ref::() + .expect("t32 should reconstruct as Time32MillisecondArray"); + let t64a = b + .column_by_name("t64") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let bina = b + .column_by_name("bin") + .unwrap() + .as_any() + .downcast_ref::() + .expect("bin should reconstruct as BinaryArray"); + let lbina = b + .column_by_name("lbin") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|&i| rid.value(i) == 1) + .map(|i| { + ( + i8a.value(i), + i16a.value(i), + u8a.value(i), + u16a.value(i), + t32a.value(i), + t64a.value(i), + bina.value(i).to_vec(), + lbina.value(i).to_vec(), + ) + }) + .collect::>() + }) + .next() + .expect("rowid 1 should be present"); + + assert_eq!(i8v, -8); + assert_eq!(i16v, -16); + assert_eq!(u8v, 8); + assert_eq!(u16v, 16); + assert_eq!(t32v, 1_000); + assert_eq!(t64v, 1); + assert_eq!(binv, vec![0x00, 0x01]); + assert_eq!(lbinv, b"abc".to_vec()); + + // Null cells in row 3 must survive as nulls (row 3: i8=127, i16=null, + // u16=null, t64=null, lbin=null). + let row3 = provider.fetch_by_keys(&[3], "rowid", None).await.unwrap(); + let b = &row3[0]; + let i16a = b + .column_by_name("i16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let lbina = b + .column_by_name("lbin") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(i16a.is_null(0), "row 3 i16 should be null"); + assert!(lbina.is_null(0), "row 3 lbin should be null"); +} From 97d40f77de5192b7aad8959a6246c8eef413b75f Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Wed, 10 Jun 2026 13:07:14 +0530 Subject: [PATCH 8/8] fix(sqlite): key off output field 0 in payload validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `validate_payload_schema` was passed `begin`'s `key_col_index`, but that indexes the input batch while the function skips that position in the *output* schema — whose key is always field 0. For any `key_col_index != 0` this skipped a real payload column (leaving an unsupported type to fail at query time, the exact defect this guards against) and validated the key column instead. Drop the parameter and always skip output field 0, matching how `finish()` and `open_or_build` derive the key from `schema.field(0)`. Add a regression test driving `begin` with a non-zero `key_col_index` and an unsupported payload column. --- src/sqlite_provider.rs | 17 +++++++++++------ tests/sqlite_provider_test.rs | 31 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 1b5e888..ea083b8 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -213,7 +213,7 @@ impl SqliteLookupProvider { "pool_size must be at least 1".into(), )); } - validate_payload_schema(&schema, 0)?; + validate_payload_schema(&schema)?; let conn = open_conn(db_path)?; let table_exists: bool = conn @@ -337,7 +337,7 @@ impl SqliteSidecarBuilder { value_col_indices.len() ))); } - validate_payload_schema(&schema, key_col_index)?; + validate_payload_schema(&schema)?; let (create_sql, insert_sql) = ddl(table_name, &schema); let conn = open_conn(db_path)?; // Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the @@ -1037,11 +1037,16 @@ fn list_item_type_supported(dt: &DataType) -> bool { /// reconstruct. Called from every build entry point so the failure surfaces at /// index-creation time with the offending column named, rather than as a /// `unsupported Arrow type` error on the first query against a sidecar that -/// "built successfully". `key_col_index` is skipped — keys are validated -/// separately by `extract_key`. -fn validate_payload_schema(schema: &SchemaRef, key_col_index: usize) -> DFResult<()> { +/// "built successfully". +/// +/// Field 0 of the *output* schema is always the key column — both `finish()` and +/// `open_or_build` derive the key name from `schema.field(0)` — so it is skipped +/// here and validated separately by `extract_key`. (Note this is the output +/// schema's key position, distinct from `begin`'s `key_col_index`, which indexes +/// the input batch.) +fn validate_payload_schema(schema: &SchemaRef) -> DFResult<()> { for (i, field) in schema.fields().iter().enumerate() { - if i == key_col_index { + if i == 0 { continue; } if !payload_type_supported(field.data_type()) { diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index f4422ce..5480ee7 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -1112,3 +1112,34 @@ async fn test_all_basic_types_roundtrip() { assert!(i16a.is_null(0), "row 3 i16 should be null"); assert!(lbina.is_null(0), "row 3 lbin should be null"); } + +/// The output schema's key is always field 0, but `begin`'s `key_col_index` +/// indexes the *input batch* and may be non-zero (e.g. a storage engine whose +/// key is not the first batch column). Validation must still key off output +/// field 0 — otherwise a non-zero `key_col_index` would skip a real payload +/// column (leaving its unsupported type to fail at query time) and validate the +/// key column instead. This locks that in. +#[tokio::test] +async fn test_unsupported_payload_rejected_with_nonzero_key_index() { + let dir = tempdir().unwrap(); + + // Output schema: key at field 0, an unsupported Decimal payload at field 1. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(10, 2), true), + ])); + + // key_col_index = 1 (key is the second column of the input batch). A bug that + // skipped output field `key_col_index` would skip "price" and wrongly pass. + let db_path = dir.path().join("nonzero_key.db"); + let err = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 1, vec![0]) + .map(|_| ()) + .expect_err("unsupported payload must be rejected regardless of key_col_index"); + + let msg = err.to_string(); + assert!( + msg.contains("price") && msg.contains("unsupported type"), + "error should name the payload column, got: {msg}" + ); +}