Skip to content

Commit 9248798

Browse files
committed
fix: prevent Firo sync stall and enable resumable anon set downloads
Two root issues fixed: 1. Sync stall at ~65% requiring force-close: When any sub-operation during _refresh() hangs, refreshMutex is held forever. Added a 5-minute master timeout that guarantees mutex release. Fixed progress to fire 0.65 after updateUTXOs() completes. 2. Anonymity set downloads restart from scratch on interruption: All sectors were accumulated in memory and written to SQLite only after the entire download completed. Now each sector is persisted immediately. On resume, only remaining sectors are fetched. Key design decisions verified against firod source (firoorg/firo@ccaf130): - API uses absolute indices (0 = newest coin, counting backwards) - blockHash pins the iteration start point (stable indices) - Same-block resume: offset indices by prevSize to skip saved coins - Cross-block resume: use `complete` flag on SparkSet to detect whether the previous download finished. Complete → use delta. Partial → full re-download (indices shifted, gap is unavoidable). - INSERT OR IGNORE handles crash-recovery and cross-block overlap - Progress uses consistent (indexOffset + fetched, meta.size) - All groupIds processed every sync (removed skip optimization) - Removed old all-or-nothing writer (dead code) - Schema: added `complete` column + UNIQUE index on SparkSetCoins https://claude.ai/code/session_01GF78pBWxrpN9rfsLEEwbMR
1 parent 243015b commit 9248798

File tree

8 files changed

+276
-139
lines changed

8 files changed

+276
-139
lines changed

lib/db/sqlite/firo_cache.dart

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,24 @@ abstract class _FiroCache {
8484
sparkSetCacheFile.path,
8585
mode: OpenMode.readWrite,
8686
);
87+
88+
// Migrations: safe to run on every startup (IF NOT EXISTS / IF NOT).
89+
_setCacheDB[network]!.execute("""
90+
CREATE UNIQUE INDEX IF NOT EXISTS idx_sparksetcoins_set_coin
91+
ON SparkSetCoins(setId, coinId);
92+
""");
93+
94+
// Add `complete` column to SparkSet for tracking whether a download
95+
// finished. Existing rows default to 1 (complete) since the old
96+
// all-or-nothing writer only saved on full completion.
97+
try {
98+
_setCacheDB[network]!.execute("""
99+
ALTER TABLE SparkSet ADD COLUMN complete INTEGER NOT NULL DEFAULT 1;
100+
""");
101+
} catch (_) {
102+
// Column already exists — safe to ignore.
103+
}
104+
87105
_usedTagsCacheDB[network] = sqlite3.open(
88106
sparkUsedTagsCacheFile.path,
89107
mode: OpenMode.readWrite,
@@ -133,6 +151,7 @@ abstract class _FiroCache {
133151
setHash TEXT NOT NULL,
134152
groupId INTEGER NOT NULL,
135153
size INTEGER NOT NULL,
154+
complete INTEGER NOT NULL DEFAULT 0,
136155
UNIQUE (blockHash, setHash, groupId)
137156
);
138157

lib/db/sqlite/firo_cache_coordinator.dart

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,52 +104,117 @@ abstract class FiroCacheCoordinator {
104104

105105
progressUpdated?.call(prevSize, meta.size);
106106

107-
if (prevMeta?.blockHash == meta.blockHash) {
108-
Logging.instance.d("prevMeta?.blockHash == meta.blockHash");
107+
if (prevMeta?.blockHash == meta.blockHash &&
108+
prevMeta!.size >= meta.size) {
109+
Logging.instance.d(
110+
"prevMeta matches meta blockHash and size >= meta.size, "
111+
"already up to date",
112+
);
109113
return;
110114
}
111115

112-
final numberOfCoinsToFetch = meta.size - prevSize;
116+
// When resuming a partial download of the SAME block, we can skip
117+
// already-saved coins because the index space hasn't shifted.
118+
//
119+
// When the block changes, we check the `complete` flag on the
120+
// previous SparkSet to determine if the old download finished.
121+
// - Complete: use the delta (meta.size - prevSize) from index 0.
122+
// The newest coins in the new block are at the lowest indices.
123+
// - Partial: indices have shifted due to the new block, so we
124+
// can't reliably compute which coins are missing. Re-download
125+
// the full set from index 0. INSERT OR IGNORE handles overlap.
126+
final bool sameBlock = prevMeta?.blockHash == meta.blockHash;
127+
128+
final int numberOfCoinsToFetch;
129+
final int indexOffset;
130+
131+
if (sameBlock) {
132+
// Same block: resume from where we left off.
133+
numberOfCoinsToFetch = meta.size - prevSize;
134+
indexOffset = prevSize;
135+
} else if (prevMeta != null && prevMeta.complete) {
136+
// Different block, but previous download was complete.
137+
// The delta coins are at indices 0..(meta.size - prevSize - 1).
138+
numberOfCoinsToFetch = meta.size - prevSize;
139+
indexOffset = 0;
140+
} else {
141+
// Different block and previous download was partial (or no
142+
// previous data). Must re-download the full set.
143+
numberOfCoinsToFetch = meta.size;
144+
indexOffset = 0;
145+
}
146+
147+
if (numberOfCoinsToFetch <= 0) {
148+
// Edge case: reorg, stale cache, or already up to date.
149+
return;
150+
}
113151

114152
final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize;
115153
final remainder = numberOfCoinsToFetch % sectorSize;
116154

117-
final List<dynamic> coins = [];
155+
int coinsSaved = 0;
118156

119157
for (int i = 0; i < fullSectorCount; i++) {
120-
final start = (i * sectorSize);
158+
final start = indexOffset + (i * sectorSize);
121159
final data = await client.getSparkAnonymitySetBySector(
122160
coinGroupId: groupId,
123161
latestBlock: meta.blockHash,
124162
startIndex: start,
125163
endIndex: start + sectorSize,
126164
);
127-
progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch);
128165

129-
coins.addAll(data);
166+
final sectorCoins =
167+
data
168+
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
169+
.toList();
170+
171+
coinsSaved += sectorCoins.length;
172+
173+
await _workers[network]!.runTask(
174+
FCTask(
175+
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
176+
data: (meta, sectorCoins, indexOffset + coinsSaved),
177+
),
178+
);
179+
180+
progressUpdated?.call(
181+
indexOffset + (i + 1) * sectorSize,
182+
meta.size,
183+
);
130184
}
131185

132186
if (remainder > 0) {
187+
final remainderStart = indexOffset + numberOfCoinsToFetch - remainder;
133188
final data = await client.getSparkAnonymitySetBySector(
134189
coinGroupId: groupId,
135190
latestBlock: meta.blockHash,
136-
startIndex: numberOfCoinsToFetch - remainder,
137-
endIndex: numberOfCoinsToFetch,
191+
startIndex: remainderStart,
192+
endIndex: indexOffset + numberOfCoinsToFetch,
138193
);
139-
progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch);
140194

141-
coins.addAll(data);
142-
}
195+
final sectorCoins =
196+
data
197+
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
198+
.toList();
143199

144-
final result =
145-
coins
146-
.map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId))
147-
.toList();
200+
coinsSaved += sectorCoins.length;
201+
202+
await _workers[network]!.runTask(
203+
FCTask(
204+
func: FCFuncName._insertSparkAnonSetCoinsIncremental,
205+
data: (meta, sectorCoins, indexOffset + coinsSaved),
206+
),
207+
);
208+
209+
progressUpdated?.call(meta.size, meta.size);
210+
}
148211

212+
// Mark this SparkSet as complete so cross-block resume knows
213+
// the download finished and can safely use the delta approach.
149214
await _workers[network]!.runTask(
150215
FCTask(
151-
func: FCFuncName._updateSparkAnonSetCoinsWith,
152-
data: (meta, result),
216+
func: FCFuncName._markSparkAnonSetComplete,
217+
data: meta,
153218
),
154219
);
155220
});
@@ -268,6 +333,7 @@ abstract class FiroCacheCoordinator {
268333
blockHash: result.first["blockHash"] as String,
269334
setHash: result.first["setHash"] as String,
270335
size: result.first["size"] as int,
336+
complete: (result.first["complete"] as int) == 1,
271337
);
272338
}
273339

lib/db/sqlite/firo_cache_reader.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ abstract class _Reader {
2525
required Database db,
2626
}) async {
2727
final query = """
28-
SELECT ss.blockHash, ss.setHash, ss.size
28+
SELECT ss.blockHash, ss.setHash, ss.size, ss.complete
2929
FROM SparkSet ss
3030
WHERE ss.groupId = $groupId
3131
ORDER BY ss.size DESC

lib/db/sqlite/firo_cache_worker.dart

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
part of 'firo_cache.dart';
22

33
enum FCFuncName {
4-
_updateSparkAnonSetCoinsWith,
4+
_insertSparkAnonSetCoinsIncremental,
5+
_markSparkAnonSetComplete,
56
_updateSparkUsedTagsWith,
67
}
78

@@ -93,13 +94,22 @@ class _FiroCacheWorker {
9394
try {
9495
final FCResult result;
9596
switch (task.func) {
96-
case FCFuncName._updateSparkAnonSetCoinsWith:
97+
case FCFuncName._insertSparkAnonSetCoinsIncremental:
9798
final data =
98-
task.data as (SparkAnonymitySetMeta, List<RawSparkCoin>);
99-
result = _updateSparkAnonSetCoinsWith(
99+
task.data
100+
as (SparkAnonymitySetMeta, List<RawSparkCoin>, int);
101+
result = _insertSparkAnonSetCoinsIncremental(
100102
setCacheDb,
101103
data.$2,
102104
data.$1,
105+
data.$3,
106+
);
107+
break;
108+
109+
case FCFuncName._markSparkAnonSetComplete:
110+
result = _markSparkAnonSetComplete(
111+
setCacheDb,
112+
task.data as SparkAnonymitySetMeta,
103113
);
104114
break;
105115

lib/db/sqlite/firo_cache_writer.dart

Lines changed: 65 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,68 +45,89 @@ FCResult _updateSparkUsedTagsWith(Database db, List<List<dynamic>> tags) {
4545
}
4646

4747
// ===========================================================================
48-
// ================== write to spark anon set cache ==========================
48+
// =========== incremental write to spark anon set cache ====================
4949

50-
/// update the sqlite cache
50+
/// Persist a single sector's worth of coins to the cache, creating or
51+
/// updating the SparkSet row as needed. Safe to call repeatedly — uses
52+
/// INSERT OR IGNORE so duplicate coins (from crash-recovery reruns) are
53+
/// silently skipped.
5154
///
52-
/// returns true if successful, otherwise false
53-
FCResult _updateSparkAnonSetCoinsWith(
55+
/// [cumulativeSize] should be prevSize + total coins saved so far (including
56+
/// this batch). It is written to SparkSet.size so that on resume,
57+
/// getLatestSetInfoForGroupId returns the correct partial progress.
58+
FCResult _insertSparkAnonSetCoinsIncremental(
5459
Database db,
5560
final List<RawSparkCoin> coinsRaw,
5661
SparkAnonymitySetMeta meta,
62+
int cumulativeSize,
5763
) {
5864
if (coinsRaw.isEmpty) {
59-
// no coins to actually insert
60-
return FCResult(success: true);
61-
}
62-
63-
final checkResult = db.select(
64-
"""
65-
SELECT *
66-
FROM SparkSet
67-
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
68-
""",
69-
[meta.blockHash, meta.setHash, meta.coinGroupId],
70-
);
71-
72-
if (checkResult.isNotEmpty) {
73-
// already up to date
7465
return FCResult(success: true);
7566
}
7667

7768
final coins = coinsRaw.reversed;
7869

7970
db.execute("BEGIN;");
8071
try {
72+
// Create SparkSet row if it doesn't exist yet for this block state.
73+
// complete = 0 marks this as an in-progress download.
8174
db.execute(
8275
"""
83-
INSERT INTO SparkSet (blockHash, setHash, groupId, size)
84-
VALUES (?, ?, ?, ?);
76+
INSERT OR IGNORE INTO SparkSet (blockHash, setHash, groupId, size, complete)
77+
VALUES (?, ?, ?, 0, 0);
78+
""",
79+
[meta.blockHash, meta.setHash, meta.coinGroupId],
80+
);
81+
82+
// Get the SparkSet row's id (whether just created or already existing).
83+
final setIdResult = db.select(
84+
"""
85+
SELECT id FROM SparkSet
86+
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
8587
""",
86-
[meta.blockHash, meta.setHash, meta.coinGroupId, meta.size],
88+
[meta.blockHash, meta.setHash, meta.coinGroupId],
8789
);
88-
final setId = db.lastInsertRowId;
90+
final setId = setIdResult.first["id"] as int;
8991

9092
for (final coin in coins) {
93+
// INSERT OR IGNORE handles duplicates from crash-recovery reruns.
9194
db.execute(
9295
"""
93-
INSERT INTO SparkCoin (serialized, txHash, context, groupId)
94-
VALUES (?, ?, ?, ?);
95-
""",
96+
INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId)
97+
VALUES (?, ?, ?, ?);
98+
""",
9699
[coin.serialized, coin.txHash, coin.context, coin.groupId],
97100
);
98-
final coinId = db.lastInsertRowId;
99101

100-
// finally add the row id to the newly added set
102+
// lastInsertRowId is 0 when INSERT OR IGNORE skips a duplicate,
103+
// so we must SELECT explicitly.
104+
final coinIdResult = db.select(
105+
"""
106+
SELECT id FROM SparkCoin
107+
WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?;
108+
""",
109+
[coin.serialized, coin.txHash, coin.context, coin.groupId],
110+
);
111+
final coinId = coinIdResult.first["id"] as int;
112+
101113
db.execute(
102114
"""
103-
INSERT INTO SparkSetCoins (setId, coinId)
115+
INSERT OR IGNORE INTO SparkSetCoins (setId, coinId)
104116
VALUES (?, ?);
105117
""",
106118
[setId, coinId],
107119
);
108120
}
109121

122+
// Update cumulative size to track partial progress.
123+
db.execute(
124+
"""
125+
UPDATE SparkSet SET size = ?
126+
WHERE id = ?;
127+
""",
128+
[cumulativeSize, setId],
129+
);
130+
110131
db.execute("COMMIT;");
111132

112133
return FCResult(success: true);
@@ -115,3 +136,18 @@ FCResult _updateSparkAnonSetCoinsWith(
115136
return FCResult(success: false, error: e);
116137
}
117138
}
139+
140+
/// Mark a SparkSet row as complete after all sectors have been downloaded.
141+
FCResult _markSparkAnonSetComplete(
142+
Database db,
143+
SparkAnonymitySetMeta meta,
144+
) {
145+
db.execute(
146+
"""
147+
UPDATE SparkSet SET complete = 1
148+
WHERE blockHash = ? AND setHash = ? AND groupId = ?;
149+
""",
150+
[meta.blockHash, meta.setHash, meta.coinGroupId],
151+
);
152+
return FCResult(success: true);
153+
}

lib/models/electrumx_response/spark_models.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ class SparkAnonymitySetMeta {
3030
final String blockHash;
3131
final String setHash;
3232
final int size;
33+
final bool complete;
3334

3435
SparkAnonymitySetMeta({
3536
required this.coinGroupId,
3637
required this.blockHash,
3738
required this.setHash,
3839
required this.size,
40+
this.complete = false,
3941
});
4042

4143
@override
@@ -44,7 +46,8 @@ class SparkAnonymitySetMeta {
4446
"coinGroupId: $coinGroupId, "
4547
"blockHash: $blockHash, "
4648
"setHash: $setHash, "
47-
"size: $size"
49+
"size: $size, "
50+
"complete: $complete"
4851
"}";
4952
}
5053
}

0 commit comments

Comments
 (0)