-
Notifications
You must be signed in to change notification settings - Fork 0
fix(sqlite-provider): use caller-provided key column name #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,9 +2,12 @@ | |
| // | ||
| // Stores all non-embedding columns in a local SQLite database (bundled libsqlite3). | ||
| // Scalar columns map to INTEGER/TEXT/REAL; list columns are serialised as JSON TEXT. | ||
| // Lookups use `WHERE row_idx IN (?, ...)` against the INTEGER PRIMARY KEY B-tree. | ||
| // Lookups use `WHERE <key_col> IN (?, ...)` against the INTEGER PRIMARY KEY B-tree. | ||
| // | ||
| // Schema: row_idx INTEGER PRIMARY KEY, <col> TEXT/INTEGER/REAL, ... | ||
| // Schema: <key_col> INTEGER PRIMARY KEY, <col> TEXT/INTEGER/REAL, ... | ||
| // | ||
| // The key column name is caller-provided (e.g. "_key") and must match the first | ||
| // field in the schema passed to `open_or_build`. | ||
| // | ||
| // Persistence: the database is written once to the given path and reused on | ||
| // subsequent runs. The first build reads all parquet files and inserts rows | ||
|
|
@@ -42,6 +45,7 @@ use crate::lookup::PointLookupProvider; | |
| pub struct SqliteLookupProvider { | ||
| schema: SchemaRef, | ||
| table_name: String, | ||
| key_col: String, | ||
| pool: Arc<Mutex<Vec<Connection>>>, | ||
| sem: Arc<Semaphore>, | ||
| } | ||
|
|
@@ -117,6 +121,8 @@ impl SqliteLookupProvider { | |
| schema: SchemaRef, | ||
| parquet_col_indices: &[usize], | ||
| ) -> DFResult<Self> { | ||
| // The first field in the schema is the key column (INTEGER PRIMARY KEY). | ||
| let key_col = schema.field(0).name().clone(); | ||
| if pool_size == 0 { | ||
| return Err(DataFusionError::Execution( | ||
| "pool_size must be at least 1".into(), | ||
|
|
@@ -167,6 +173,7 @@ impl SqliteLookupProvider { | |
| Ok(Self { | ||
| schema, | ||
| table_name: table_name.to_string(), | ||
| key_col, | ||
| pool: Arc::new(Mutex::new(conns)), | ||
| sem: Arc::new(Semaphore::new(pool_size)), | ||
| }) | ||
|
|
@@ -202,6 +209,7 @@ impl PointLookupProvider for SqliteLookupProvider { | |
| let keys_vec = keys.to_vec(); | ||
| let pool = self.pool.clone(); | ||
| let table_name = self.table_name.clone(); | ||
| let key_col = self.key_col.clone(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Non-blocking suggestion: |
||
|
|
||
| // Acquire a semaphore permit to bound concurrency to the pool size, | ||
| // then run the synchronous SQLite query on a blocking thread. | ||
|
|
@@ -227,6 +235,7 @@ impl PointLookupProvider for SqliteLookupProvider { | |
| &keys_vec, | ||
| &out_schema, | ||
| &table_name, | ||
| &key_col, | ||
| ); | ||
| drop(guard); // explicit but not required — Drop handles it | ||
| res | ||
|
|
@@ -243,6 +252,7 @@ fn execute_query_sync( | |
| keys: &[u64], | ||
| out_schema: &SchemaRef, | ||
| table_name: &str, | ||
| key_col: &str, | ||
| ) -> DFResult<Vec<RecordBatch>> { | ||
| let placeholders = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", "); | ||
| // Select only the columns in out_schema (already projection-applied by the | ||
|
|
@@ -253,8 +263,9 @@ fn execute_query_sync( | |
| .map(|f| quote_ident(f.name())) | ||
| .collect::<Vec<_>>() | ||
| .join(", "); | ||
| let qk = quote_ident(key_col); | ||
| let sql = format!( | ||
| "SELECT {col_list} FROM {tn} WHERE row_idx IN ({placeholders}) ORDER BY row_idx", | ||
| "SELECT {col_list} FROM {tn} WHERE {qk} IN ({placeholders}) ORDER BY {qk}", | ||
| tn = quote_ident(table_name) | ||
| ); | ||
|
|
||
|
|
@@ -586,14 +597,16 @@ fn build_table( | |
| schema: &SchemaRef, | ||
| parquet_col_indices: &[usize], | ||
| ) -> DFResult<()> { | ||
| // The first field is the key column (INTEGER PRIMARY KEY). | ||
| let key_col_name = schema.field(0).name(); | ||
| let col_defs = schema | ||
| .fields() | ||
| .iter() | ||
| .map(|f| { | ||
| let sql_type = arrow_type_to_sql(f.data_type()); | ||
| if f.name() == "row_idx" { | ||
| "row_idx INTEGER PRIMARY KEY".to_string() | ||
| if f.name() == key_col_name { | ||
| format!("{} INTEGER PRIMARY KEY", quote_ident(f.name())) | ||
| } else { | ||
| let sql_type = arrow_type_to_sql(f.data_type()); | ||
| format!("{} {}", quote_ident(f.name()), sql_type) | ||
| } | ||
| }) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: all existing tests happen to use
row_idxas the key column name, so the only coverage here is "dynamic name produces the same SQL as the old hardcoded string." Consider adding a test insqlite_provider_test.rsthat builds a provider with a schema whose first field is named something other thanrow_idx(e.g._key) and confirms thatfetch_by_keysreturns the correct rows. That would give you a direct regression guard for the actual bug scenario.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done — added
test_custom_key_column_namethat uses_keyas the key column name.