Skip to content

Commit d66d2c2

Browse files
committed
fix: prevent Firo sync stall and enable resumable anon set downloads
Two root issues fixed: 1. Sync stall at ~65% requiring force-close: When any sub-operation during _refresh() hangs, refreshMutex is held forever. Added a 5-minute master timeout that guarantees mutex release. Fixed progress to fire 0.65 after updateUTXOs() completes. 2. Anonymity set downloads restart from scratch on interruption: All sectors were accumulated in memory and written to SQLite only after the entire download completed. Now each sector is persisted immediately. On resume, only remaining sectors are fetched. Verified against firod source (firoorg/firo@ccaf130): - API uses absolute indices (0 = newest coin, counting backwards) - blockHash pins the iteration start point (stable indices) - Sector indices offset by prevSize when resuming same block - INSERT OR IGNORE handles crash-recovery duplicates - Progress callback uses consistent (prevSize + fetched, meta.size) - All groupIds processed every sync (removed checkSetInfoForGroupIdExists skip that would leave partial downloads incomplete) - Removed old all-or-nothing writer (dead code after this change) - Added UNIQUE index on SparkSetCoins(setId, coinId) — existing data has no duplicates so CREATE UNIQUE INDEX IF NOT EXISTS succeeds https://claude.ai/code/session_01GF78pBWxrpN9rfsLEEwbMR
1 parent 243015b commit d66d2c2

File tree

6 files changed

+211
-140
lines changed

6 files changed

+211
-140
lines changed

lib/db/sqlite/firo_cache.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ abstract class _FiroCache {
8484
sparkSetCacheFile.path,
8585
mode: OpenMode.readWrite,
8686
);
87+
88+
// Ensure unique index exists for incremental sector writes.
89+
// Safe to run on every startup (IF NOT EXISTS).
90+
_setCacheDB[network]!.execute("""
91+
CREATE UNIQUE INDEX IF NOT EXISTS idx_sparksetcoins_set_coin
92+
ON SparkSetCoins(setId, coinId);
93+
""");
94+
8795
_usedTagsCacheDB[network] = sqlite3.open(
8896
sparkUsedTagsCacheFile.path,
8997
mode: OpenMode.readWrite,

lib/db/sqlite/firo_cache_coordinator.dart

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -104,54 +104,92 @@ abstract class FiroCacheCoordinator {
104104

105105
progressUpdated?.call(prevSize, meta.size);
106106

107-
if (prevMeta?.blockHash == meta.blockHash) {
108-
Logging.instance.d("prevMeta?.blockHash == meta.blockHash");
107+
if (prevMeta?.blockHash == meta.blockHash &&
108+
prevMeta!.size >= meta.size) {
109+
Logging.instance.d(
110+
"prevMeta matches meta blockHash and size >= meta.size, "
111+
"already up to date",
112+
);
109113
return;
110114
}
111115

112116
final numberOfCoinsToFetch = meta.size - prevSize;
113117

118+
if (numberOfCoinsToFetch <= 0) {
119+
// Edge case: reorg or stale cache.
120+
return;
121+
}
122+
123+
// When resuming a partial download of the same block, sector indices
124+
// must be offset because the API uses absolute indices (0 = newest
125+
// coin, counting backward through blocks). Coins at indices
126+
// 0..prevSize-1 are already saved, so we continue from prevSize.
127+
//
128+
// For a new block (prevMeta.blockHash != meta.blockHash), the delta
129+
// starts at index 0 since the newest coins are the ones we don't
130+
// have yet.
131+
final int indexOffset =
132+
(prevMeta?.blockHash == meta.blockHash) ? prevSize : 0;
133+
114134
final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
115135
final remainder = numberOfCoinsToFetch % sectorSize;
116136

117-
final List<dynamic> coins = [];
137+
int coinsSaved = 0;
118138

119139
for (int i = 0; i < fullSectorCount; i++) {
120-
final start = (i * sectorSize);
140+
final start = indexOffset + (i * sectorSize);
121141
final data = await client.getSparkAnonymitySetBySector(
122142
coinGroupId: groupId,
123143
latestBlock: meta.blockHash,
124144
startIndex: start,
125145
endIndex: start + sectorSize,
126146
);
127-
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);
128147

129-
coins.addAll(data);
148+
final sectorCoins =
149+
data
150+
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
151+
.toList();
152+
153+
coinsSaved += sectorCoins.length;
154+
155+
await _workers[network]!.runTask(
156+
FCTask(
157+
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
158+
data: (meta, sectorCoins, prevSize + coinsSaved),
159+
),
160+
);
161+
162+
progressUpdated?.call(
163+
prevSize + (i + 1) * sectorSize,
164+
meta.size,
165+
);
130166
}
131167

132168
if (remainder > 0) {
169+
final remainderStart = indexOffset + numberOfCoinsToFetch - remainder;
133170
final data = await client.getSparkAnonymitySetBySector(
134171
coinGroupId: groupId,
135172
latestBlock: meta.blockHash,
136-
startIndex: numberOfCoinsToFetch - remainder,
137-
endIndex: numberOfCoinsToFetch,
173+
startIndex: remainderStart,
174+
endIndex: indexOffset + numberOfCoinsToFetch,
138175
);
139-
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);
140176

141-
coins.addAll(data);
142-
}
177+
final sectorCoins =
178+
data
179+
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
180+
.toList();
143181

144-
final result =
145-
coins
146-
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
147-
.toList();
182+
coinsSaved += sectorCoins.length;
148183

149-
await _workers[network]!.runTask(
150-
FCTask(
151-
func: FCFuncName._updateSparkAnonSetCoinsWith,
152-
data: (meta, result),
153-
),
154-
);
184+
await _workers[network]!.runTask(
185+
FCTask(
186+
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
187+
data: (meta, sectorCoins, prevSize + coinsSaved),
188+
),
189+
);
190+
191+
progressUpdated?.call(meta.size, meta.size);
192+
}
155193
});
156194
}
157195

lib/db/sqlite/firo_cache_worker.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
part of 'firo_cache.dart';
22

33
enum FCFuncName {
4-
_updateSparkAnonSetCoinsWith,
4+
_insertSparkAnonSetCoinsIncremental,
55
_updateSparkUsedTagsWith,
66
}
77

@@ -93,13 +93,15 @@ class _FiroCacheWorker {
9393
try {
9494
final FCResult result;
9595
switch (task.func) {
96-
case FCFuncName._updateSparkAnonSetCoinsWith:
96+
case FCFuncName._insertSparkAnonSetCoinsIncremental:
9797
final data =
98-
task.data as (SparkAnonymitySetMeta, List<RawSparkCoin>);
99-
result = _updateSparkAnonSetCoinsWith(
98+
task.data
99+
as (SparkAnonymitySetMeta, List<RawSparkCoin>, int);
100+
result = _insertSparkAnonSetCoinsIncremental(
100101
setCacheDb,
101102
data.$2,
102103
data.$1,
104+
data.$3,
103105
);
104106
break;
105107

lib/db/sqlite/firo_cache_writer.dart

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,68 +45,88 @@ FCResult _updateSparkUsedTagsWith(Database db, List<List<dynamic>> tags) {
4545
}
4646

4747
// ===========================================================================
48-
// ================== write to spark anon set cache ==========================
48+
// =========== incremental write to spark anon set cache ====================
4949

50-
/// update the sqlite cache
50+
/// Persist a single sector's worth of coins to the cache, creating or
51+
/// updating the SparkSet row as needed. Safe to call repeatedly — uses
52+
/// INSERT OR IGNORE so duplicate coins (from crash-recovery reruns) are
53+
/// silently skipped.
5154
///
52-
/// returns true if successful, otherwise false
53-
FCResult _updateSparkAnonSetCoinsWith(
55+
/// [cumulativeSize] should be prevSize + total coins saved so far (including
56+
/// this batch). It is written to SparkSet.size so that on resume,
57+
/// getLatestSetInfoForGroupId returns the correct partial progress.
58+
FCResult _insertSparkAnonSetCoinsIncremental(
5459
Database db,
5560
final List<RawSparkCoin> coinsRaw,
5661
SparkAnonymitySetMeta meta,
62+
int cumulativeSize,
5763
) {
5864
if (coinsRaw.isEmpty) {
59-
// no coins to actually insert
60-
return FCResult(success: true);
61-
}
62-
63-
final checkResult = db.select(
64-
"""
65-
SELECT *
66-
FROM SparkSet
67-
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
68-
""",
69-
[meta.blockHash, meta.setHash, meta.coinGroupId],
70-
);
71-
72-
if (checkResult.isNotEmpty) {
73-
// already up to date
7465
return FCResult(success: true);
7566
}
7667

7768
final coins = coinsRaw.reversed;
7869

7970
db.execute("BEGIN;");
8071
try {
72+
// Create SparkSet row if it doesn't exist yet for this block state.
8173
db.execute(
8274
"""
83-
INSERT INTO SparkSet (blockHash, setHash, groupId, size)
84-
VALUES (?, ?, ?, ?);
75+
INSERT OR IGNORE INTO SparkSet (blockHash, setHash, groupId, size)
76+
VALUES (?, ?, ?, 0);
77+
""",
78+
[meta.blockHash, meta.setHash, meta.coinGroupId],
79+
);
80+
81+
// Get the SparkSet row's id (whether just created or already existing).
82+
final setIdResult = db.select(
83+
"""
84+
SELECT id FROM SparkSet
85+
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
8586
""",
86-
[meta.blockHash, meta.setHash, meta.coinGroupId, meta.size],
87+
[meta.blockHash, meta.setHash, meta.coinGroupId],
8788
);
88-
final setId = db.lastInsertRowId;
89+
final setId = setIdResult.first["id"] as int;
8990

9091
for (final coin in coins) {
92+
// INSERT OR IGNORE handles duplicates from crash-recovery reruns.
9193
db.execute(
9294
"""
93-
INSERT INTO SparkCoin (serialized, txHash, context, groupId)
94-
VALUES (?, ?, ?, ?);
95-
""",
95+
INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId)
96+
VALUES (?, ?, ?, ?);
97+
""",
9698
[coin.serialized, coin.txHash, coin.context, coin.groupId],
9799
);
98-
final coinId = db.lastInsertRowId;
99100

100-
// finally add the row id to the newly added set
101+
// lastInsertRowId is 0 when INSERT OR IGNORE skips a duplicate,
102+
// so we must SELECT explicitly.
103+
final coinIdResult = db.select(
104+
"""
105+
SELECT id FROM SparkCoin
106+
WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?;
107+
""",
108+
[coin.serialized, coin.txHash, coin.context, coin.groupId],
109+
);
110+
final coinId = coinIdResult.first["id"] as int;
111+
101112
db.execute(
102113
"""
103-
INSERT INTO SparkSetCoins (setId, coinId)
114+
INSERT OR IGNORE INTO SparkSetCoins (setId, coinId)
104115
VALUES (?, ?);
105116
""",
106117
[setId, coinId],
107118
);
108119
}
109120

121+
// Update cumulative size to track partial progress.
122+
db.execute(
123+
"""
124+
UPDATE SparkSet SET size = ?
125+
WHERE id = ?;
126+
""",
127+
[cumulativeSize, setId],
128+
);
129+
110130
db.execute("COMMIT;");
111131

112132
return FCResult(success: true);

0 commit comments

Comments
 (0)