Skip to content

Commit 0df2c6b

Browse files
benthecarmanclaude
andcommitted
Add PaginatedKVStore support to SqliteStore
Implement rust-lightning's PaginatedKVStore/PaginatedKVStoreSync traits on SqliteStore with a v2→v3 schema migration that adds a created_at column for tracking insertion order. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a3c2ced commit 0df2c6b

File tree

3 files changed

+808
-11
lines changed

3 files changed

+808
-11
lines changed

src/io/sqlite_store/migrations.rs

Lines changed: 165 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ use lightning::io;
99
use rusqlite::Connection;
1010

1111
pub(super) fn migrate_schema(
12-
connection: &mut Connection, kv_table_name: &str, from_version: u16, to_version: u16,
12+
connection: &mut Connection, kv_table_name: &str, mut from_version: u16, to_version: u16,
1313
) -> io::Result<()> {
1414
assert!(from_version < to_version);
15-
if from_version == 1 && to_version == 2 {
15+
if from_version == 1 && to_version >= 2 {
1616
migrate_v1_to_v2(connection, kv_table_name)?;
17+
from_version = 2;
18+
}
19+
if from_version == 2 && to_version >= 3 {
20+
migrate_v2_to_v3(connection, kv_table_name)?;
1721
}
1822

1923
Ok(())
@@ -65,11 +69,82 @@ fn migrate_v1_to_v2(connection: &mut Connection, kv_table_name: &str) -> io::Res
6569
Ok(())
6670
}
6771

72+
fn migrate_v2_to_v3(connection: &mut Connection, kv_table_name: &str) -> io::Result<()> {
73+
let tx = connection.transaction().map_err(|e| {
74+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
75+
io::Error::new(io::ErrorKind::Other, msg)
76+
})?;
77+
78+
let old_table = format!("{}_v2_old", kv_table_name);
79+
let map_err = |e: rusqlite::Error| -> io::Error {
80+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
81+
io::Error::new(io::ErrorKind::Other, msg)
82+
};
83+
84+
// Recreate the table to ensure the correct PRIMARY KEY regardless of migration history.
85+
// Tables migrated from v1 have PK (primary_namespace, key) only — missing
86+
// secondary_namespace. Recreating normalizes the schema for all databases.
87+
tx.execute(&format!("ALTER TABLE {} RENAME TO {}", kv_table_name, old_table), [])
88+
.map_err(map_err)?;
89+
90+
tx.execute(
91+
&format!(
92+
"CREATE TABLE {} (
93+
primary_namespace TEXT NOT NULL,
94+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
95+
key TEXT NOT NULL CHECK (key <> ''),
96+
value BLOB,
97+
created_at INTEGER NOT NULL DEFAULT 0,
98+
PRIMARY KEY (primary_namespace, secondary_namespace, key)
99+
)",
100+
kv_table_name
101+
),
102+
[],
103+
)
104+
.map_err(map_err)?;
105+
106+
// Copy data and backfill created_at from ROWID for relative ordering
107+
tx.execute(
108+
&format!(
109+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value, created_at)
110+
SELECT primary_namespace, secondary_namespace, key, value, ROWID FROM {}",
111+
kv_table_name, old_table
112+
),
113+
[],
114+
)
115+
.map_err(map_err)?;
116+
117+
tx.execute(&format!("DROP TABLE {}", old_table), []).map_err(map_err)?;
118+
119+
// Create composite index for paginated listing
120+
let sql = format!(
121+
"CREATE INDEX idx_{}_paginated ON {} (primary_namespace, secondary_namespace, created_at DESC, key ASC)",
122+
kv_table_name, kv_table_name
123+
);
124+
125+
tx.execute(&sql, []).map_err(map_err)?;
126+
127+
// Update user_version
128+
tx.pragma(Some(rusqlite::DatabaseName::Main), "user_version", 3u16, |_| Ok(())).map_err(
129+
|e| {
130+
let msg = format!("Failed to upgrade user_version from 2 to 3: {}", e);
131+
io::Error::new(io::ErrorKind::Other, msg)
132+
},
133+
)?;
134+
135+
tx.commit().map_err(|e| {
136+
let msg = format!("Failed to migrate table {} from v2 to v3: {}", kv_table_name, e);
137+
io::Error::new(io::ErrorKind::Other, msg)
138+
})?;
139+
140+
Ok(())
141+
}
142+
68143
#[cfg(test)]
69144
mod tests {
70145
use std::fs;
71146

72-
use lightning::util::persist::KVStoreSync;
147+
use lightning::util::persist::{KVStoreSync, PaginatedKVStoreSync};
73148
use rusqlite::{named_params, Connection};
74149

75150
use crate::io::sqlite_store::SqliteStore;
@@ -159,4 +234,91 @@ mod tests {
159234
// Check we can continue to use the store just fine.
160235
do_read_write_remove_list_persist(&store);
161236
}
237+
238+
#[test]
239+
fn rwrl_post_schema_2_migration() {
240+
let old_schema_version = 2u16;
241+
242+
let mut temp_path = random_storage_path();
243+
temp_path.push("rwrl_post_schema_2_migration");
244+
245+
let db_file_name = "test_db".to_string();
246+
let kv_table_name = "test_table".to_string();
247+
248+
let test_ns = "testspace";
249+
let test_sub = "testsub";
250+
251+
{
252+
// Create a v2 database manually
253+
fs::create_dir_all(temp_path.clone()).unwrap();
254+
let mut db_file_path = temp_path.clone();
255+
db_file_path.push(db_file_name.clone());
256+
257+
let connection = Connection::open(db_file_path.clone()).unwrap();
258+
259+
connection
260+
.pragma(
261+
Some(rusqlite::DatabaseName::Main),
262+
"user_version",
263+
old_schema_version,
264+
|_| Ok(()),
265+
)
266+
.unwrap();
267+
268+
let sql = format!(
269+
"CREATE TABLE IF NOT EXISTS {} (
270+
primary_namespace TEXT NOT NULL,
271+
secondary_namespace TEXT DEFAULT \"\" NOT NULL,
272+
key TEXT NOT NULL CHECK (key <> ''),
273+
value BLOB, PRIMARY KEY ( primary_namespace, secondary_namespace, key )
274+
);",
275+
kv_table_name
276+
);
277+
connection.execute(&sql, []).unwrap();
278+
279+
// Insert 3 rows in a known order
280+
for i in 0..3 {
281+
let key = format!("key_{}", i);
282+
let sql = format!(
283+
"INSERT INTO {} (primary_namespace, secondary_namespace, key, value) VALUES (:ns, :sub, :key, :value);",
284+
kv_table_name
285+
);
286+
let mut stmt = connection.prepare_cached(&sql).unwrap();
287+
stmt.execute(named_params! {
288+
":ns": test_ns,
289+
":sub": test_sub,
290+
":key": key,
291+
":value": vec![i as u8; 8],
292+
})
293+
.unwrap();
294+
}
295+
}
296+
297+
// Open with new code, triggering v2→v3 migration
298+
let store = SqliteStore::new(temp_path, Some(db_file_name), Some(kv_table_name)).unwrap();
299+
300+
// Verify data survived
301+
for i in 0..3 {
302+
let key = format!("key_{}", i);
303+
let data = store.read(test_ns, test_sub, &key).unwrap();
304+
assert_eq!(data, vec![i as u8; 8]);
305+
}
306+
307+
// Verify paginated listing works and returns entries in ROWID-backfilled order (newest first)
308+
let response =
309+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
310+
assert_eq!(response.keys.len(), 3);
311+
// ROWIDs were 1, 2, 3 so created_at was backfilled as 1, 2, 3
312+
// Newest first: key_2, key_1, key_0
313+
assert_eq!(response.keys, vec!["key_2", "key_1", "key_0"]);
314+
315+
// Verify we can write new entries and they get proper ordering
316+
KVStoreSync::write(&store, test_ns, test_sub, "key_new", vec![99u8; 8]).unwrap();
317+
let response =
318+
PaginatedKVStoreSync::list_paginated(&store, test_ns, test_sub, None).unwrap();
319+
assert_eq!(response.keys[0], "key_new");
320+
321+
// Check we can continue to use the store just fine.
322+
do_read_write_remove_list_persist(&store);
323+
}
162324
}

0 commit comments

Comments
 (0)