From 92487980cedaefbd7b8536477c47f5e35812fb7c Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 7 Apr 2026 21:21:42 +0000 Subject: [PATCH] fix: prevent Firo sync stall and enable resumable anon set downloads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two root issues fixed: 1. Sync stall at ~65% requiring force-close: When any sub-operation during _refresh() hangs, refreshMutex is held forever. Added a 5-minute master timeout that guarantees mutex release. Fixed progress to fire 0.65 after updateUTXOs() completes. 2. Anonymity set downloads restart from scratch on interruption: All sectors were accumulated in memory and written to SQLite only after the entire download completed. Now each sector is persisted immediately. On resume, only remaining sectors are fetched. Key design decisions verified against firod source (firoorg/firo@ccaf130): - API uses absolute indices (0 = newest coin, counting backwards) - blockHash pins the iteration start point (stable indices) - Same-block resume: offset indices by prevSize to skip saved coins - Cross-block resume: use `complete` flag on SparkSet to detect whether the previous download finished. Complete → use delta. Partial → full re-download (indices shifted, gap is unavoidable). - INSERT OR IGNORE handles crash-recovery and cross-block overlap - Progress uses consistent (indexOffset + fetched, meta.size) - All groupIds processed every sync (removed skip optimization) - Removed old all-or-nothing writer (dead code) - Schema: added `complete` column + UNIQUE index on SparkSetCoins https://claude.ai/code/session_01GF78pBWxrpN9rfsLEEwbMR --- lib/db/sqlite/firo_cache.dart | 19 +++ lib/db/sqlite/firo_cache_coordinator.dart | 102 ++++++++++-- lib/db/sqlite/firo_cache_reader.dart | 2 +- lib/db/sqlite/firo_cache_worker.dart | 18 +- lib/db/sqlite/firo_cache_writer.dart | 94 +++++++---- .../electrumx_response/spark_models.dart | 5 +- lib/wallets/wallet/wallet.dart | 154 ++++++++++-------- .../spark_interface.dart | 21 +-- 8 files changed, 276 insertions(+), 139 deletions(-) diff --git a/lib/db/sqlite/firo_cache.dart b/lib/db/sqlite/firo_cache.dart index 9a0d83f6a2..8b942dbbdf 100644 --- a/lib/db/sqlite/firo_cache.dart +++ b/lib/db/sqlite/firo_cache.dart @@ -84,6 +84,24 @@ abstract class _FiroCache { sparkSetCacheFile.path, mode: OpenMode.readWrite, ); + + // Migrations: safe to run on every startup (IF NOT EXISTS / IF NOT). + _setCacheDB[network]!.execute(""" + CREATE UNIQUE INDEX IF NOT EXISTS idx_sparksetcoins_set_coin + ON SparkSetCoins(setId, coinId); + """); + + // Add `complete` column to SparkSet for tracking whether a download + // finished. Existing rows default to 1 (complete) since the old + // all-or-nothing writer only saved on full completion. + try { + _setCacheDB[network]!.execute(""" + ALTER TABLE SparkSet ADD COLUMN complete INTEGER NOT NULL DEFAULT 1; + """); + } catch (_) { + // Column already exists — safe to ignore. + } + _usedTagsCacheDB[network] = sqlite3.open( sparkUsedTagsCacheFile.path, mode: OpenMode.readWrite, @@ -133,6 +151,7 @@ abstract class _FiroCache { setHash TEXT NOT NULL, groupId INTEGER NOT NULL, size INTEGER NOT NULL, + complete INTEGER NOT NULL DEFAULT 0, UNIQUE (blockHash, setHash, groupId) ); diff --git a/lib/db/sqlite/firo_cache_coordinator.dart b/lib/db/sqlite/firo_cache_coordinator.dart index 5ecd7534b9..138d4ee506 100644 --- a/lib/db/sqlite/firo_cache_coordinator.dart +++ b/lib/db/sqlite/firo_cache_coordinator.dart @@ -104,52 +104,117 @@ abstract class FiroCacheCoordinator { progressUpdated?.call(prevSize, meta.size); - if (prevMeta?.blockHash == meta.blockHash) { - Logging.instance.d("prevMeta?.blockHash == meta.blockHash"); + if (prevMeta?.blockHash == meta.blockHash && + prevMeta!.size >= meta.size) { + Logging.instance.d( + "prevMeta matches meta blockHash and size >= meta.size, " + "already up to date", + ); return; } - final numberOfCoinsToFetch = meta.size - prevSize; + // When resuming a partial download of the SAME block, we can skip + // already-saved coins because the index space hasn't shifted. + // + // When the block changes, we check the `complete` flag on the + // previous SparkSet to determine if the old download finished. + // - Complete: use the delta (meta.size - prevSize) from index 0. + // The newest coins in the new block are at the lowest indices. + // - Partial: indices have shifted due to the new block, so we + // can't reliably compute which coins are missing. Re-download + // the full set from index 0. INSERT OR IGNORE handles overlap. + final bool sameBlock = prevMeta?.blockHash == meta.blockHash; + + final int numberOfCoinsToFetch; + final int indexOffset; + + if (sameBlock) { + // Same block: resume from where we left off. + numberOfCoinsToFetch = meta.size - prevSize; + indexOffset = prevSize; + } else if (prevMeta != null && prevMeta.complete) { + // Different block, but previous download was complete. + // The delta coins are at indices 0..(meta.size - prevSize - 1). + numberOfCoinsToFetch = meta.size - prevSize; + indexOffset = 0; + } else { + // Different block and previous download was partial (or no + // previous data). Must re-download the full set. + numberOfCoinsToFetch = meta.size; + indexOffset = 0; + } + + if (numberOfCoinsToFetch <= 0) { + // Edge case: reorg, stale cache, or already up to date. + return; + } final fullSectorCount = numberOfCoinsToFetch ~/ sectorSize; final remainder = numberOfCoinsToFetch % sectorSize; - final List coins = []; + int coinsSaved = 0; for (int i = 0; i < fullSectorCount; i++) { - final start = (i * sectorSize); + final start = indexOffset + (i * sectorSize); final data = await client.getSparkAnonymitySetBySector( coinGroupId: groupId, latestBlock: meta.blockHash, startIndex: start, endIndex: start + sectorSize, ); - progressUpdated?.call(start + sectorSize, numberOfCoinsToFetch); - coins.addAll(data); + final sectorCoins = + data + .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) + .toList(); + + coinsSaved += sectorCoins.length; + + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._insertSparkAnonSetCoinsIncremental, + data: (meta, sectorCoins, indexOffset + coinsSaved), + ), + ); + + progressUpdated?.call( + indexOffset + (i + 1) * sectorSize, + meta.size, + ); } if (remainder > 0) { + final remainderStart = indexOffset + numberOfCoinsToFetch - remainder; final data = await client.getSparkAnonymitySetBySector( coinGroupId: groupId, latestBlock: meta.blockHash, - startIndex: numberOfCoinsToFetch - remainder, - endIndex: numberOfCoinsToFetch, + startIndex: remainderStart, + endIndex: indexOffset + numberOfCoinsToFetch, ); - progressUpdated?.call(numberOfCoinsToFetch, numberOfCoinsToFetch); - coins.addAll(data); - } + final sectorCoins = + data + .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) + .toList(); - final result = - coins - .map((e) => RawSparkCoin.fromRPCResponse(e as List, groupId)) - .toList(); + coinsSaved += sectorCoins.length; + + await _workers[network]!.runTask( + FCTask( + func: FCFuncName._insertSparkAnonSetCoinsIncremental, + data: (meta, sectorCoins, indexOffset + coinsSaved), + ), + ); + + progressUpdated?.call(meta.size, meta.size); + } + // Mark this SparkSet as complete so cross-block resume knows + // the download finished and can safely use the delta approach. await _workers[network]!.runTask( FCTask( - func: FCFuncName._updateSparkAnonSetCoinsWith, - data: (meta, result), + func: FCFuncName._markSparkAnonSetComplete, + data: meta, ), ); }); @@ -268,6 +333,7 @@ abstract class FiroCacheCoordinator { blockHash: result.first["blockHash"] as String, setHash: result.first["setHash"] as String, size: result.first["size"] as int, + complete: (result.first["complete"] as int) == 1, ); } diff --git a/lib/db/sqlite/firo_cache_reader.dart b/lib/db/sqlite/firo_cache_reader.dart index 67fea7764c..f31e744132 100644 --- a/lib/db/sqlite/firo_cache_reader.dart +++ b/lib/db/sqlite/firo_cache_reader.dart @@ -25,7 +25,7 @@ abstract class _Reader { required Database db, }) async { final query = """ - SELECT ss.blockHash, ss.setHash, ss.size + SELECT ss.blockHash, ss.setHash, ss.size, ss.complete FROM SparkSet ss WHERE ss.groupId = $groupId ORDER BY ss.size DESC diff --git a/lib/db/sqlite/firo_cache_worker.dart b/lib/db/sqlite/firo_cache_worker.dart index abaceb288b..67073b1181 100644 --- a/lib/db/sqlite/firo_cache_worker.dart +++ b/lib/db/sqlite/firo_cache_worker.dart @@ -1,7 +1,8 @@ part of 'firo_cache.dart'; enum FCFuncName { - _updateSparkAnonSetCoinsWith, + _insertSparkAnonSetCoinsIncremental, + _markSparkAnonSetComplete, _updateSparkUsedTagsWith, } @@ -93,13 +94,22 @@ class _FiroCacheWorker { try { final FCResult result; switch (task.func) { - case FCFuncName._updateSparkAnonSetCoinsWith: + case FCFuncName._insertSparkAnonSetCoinsIncremental: final data = - task.data as (SparkAnonymitySetMeta, List); - result = _updateSparkAnonSetCoinsWith( + task.data + as (SparkAnonymitySetMeta, List, int); + result = _insertSparkAnonSetCoinsIncremental( setCacheDb, data.$2, data.$1, + data.$3, + ); + break; + + case FCFuncName._markSparkAnonSetComplete: + result = _markSparkAnonSetComplete( + setCacheDb, + task.data as SparkAnonymitySetMeta, ); break; diff --git a/lib/db/sqlite/firo_cache_writer.dart b/lib/db/sqlite/firo_cache_writer.dart index fadc3eb91c..b97f5867d7 100644 --- a/lib/db/sqlite/firo_cache_writer.dart +++ b/lib/db/sqlite/firo_cache_writer.dart @@ -45,32 +45,23 @@ FCResult _updateSparkUsedTagsWith(Database db, List> tags) { } // =========================================================================== -// ================== write to spark anon set cache ========================== +// =========== incremental write to spark anon set cache ==================== -/// update the sqlite cache +/// Persist a single sector's worth of coins to the cache, creating or +/// updating the SparkSet row as needed. Safe to call repeatedly — uses +/// INSERT OR IGNORE so duplicate coins (from crash-recovery reruns) are +/// silently skipped. /// -/// returns true if successful, otherwise false -FCResult _updateSparkAnonSetCoinsWith( +/// [cumulativeSize] should be prevSize + total coins saved so far (including +/// this batch). It is written to SparkSet.size so that on resume, +/// getLatestSetInfoForGroupId returns the correct partial progress. +FCResult _insertSparkAnonSetCoinsIncremental( Database db, final List coinsRaw, SparkAnonymitySetMeta meta, + int cumulativeSize, ) { if (coinsRaw.isEmpty) { - // no coins to actually insert - return FCResult(success: true); - } - - final checkResult = db.select( - """ - SELECT * - FROM SparkSet - WHERE blockHash = ? AND setHash = ? AND groupId = ?; - """, - [meta.blockHash, meta.setHash, meta.coinGroupId], - ); - - if (checkResult.isNotEmpty) { - // already up to date return FCResult(success: true); } @@ -78,35 +69,65 @@ FCResult _updateSparkAnonSetCoinsWith( db.execute("BEGIN;"); try { + // Create SparkSet row if it doesn't exist yet for this block state. + // complete = 0 marks this as an in-progress download. db.execute( """ - INSERT INTO SparkSet (blockHash, setHash, groupId, size) - VALUES (?, ?, ?, ?); + INSERT OR IGNORE INTO SparkSet (blockHash, setHash, groupId, size, complete) + VALUES (?, ?, ?, 0, 0); + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + + // Get the SparkSet row's id (whether just created or already existing). + final setIdResult = db.select( + """ + SELECT id FROM SparkSet + WHERE blockHash = ? AND setHash = ? AND groupId = ?; """, - [meta.blockHash, meta.setHash, meta.coinGroupId, meta.size], + [meta.blockHash, meta.setHash, meta.coinGroupId], ); - final setId = db.lastInsertRowId; + final setId = setIdResult.first["id"] as int; for (final coin in coins) { + // INSERT OR IGNORE handles duplicates from crash-recovery reruns. db.execute( """ - INSERT INTO SparkCoin (serialized, txHash, context, groupId) - VALUES (?, ?, ?, ?); - """, + INSERT OR IGNORE INTO SparkCoin (serialized, txHash, context, groupId) + VALUES (?, ?, ?, ?); + """, [coin.serialized, coin.txHash, coin.context, coin.groupId], ); - final coinId = db.lastInsertRowId; - // finally add the row id to the newly added set + // lastInsertRowId is 0 when INSERT OR IGNORE skips a duplicate, + // so we must SELECT explicitly. + final coinIdResult = db.select( + """ + SELECT id FROM SparkCoin + WHERE serialized = ? AND txHash = ? AND context = ? AND groupId = ?; + """, + [coin.serialized, coin.txHash, coin.context, coin.groupId], + ); + final coinId = coinIdResult.first["id"] as int; + db.execute( """ - INSERT INTO SparkSetCoins (setId, coinId) + INSERT OR IGNORE INTO SparkSetCoins (setId, coinId) VALUES (?, ?); """, [setId, coinId], ); } + // Update cumulative size to track partial progress. + db.execute( + """ + UPDATE SparkSet SET size = ? + WHERE id = ?; + """, + [cumulativeSize, setId], + ); + db.execute("COMMIT;"); return FCResult(success: true); @@ -115,3 +136,18 @@ FCResult _updateSparkAnonSetCoinsWith( return FCResult(success: false, error: e); } } + +/// Mark a SparkSet row as complete after all sectors have been downloaded. +FCResult _markSparkAnonSetComplete( + Database db, + SparkAnonymitySetMeta meta, +) { + db.execute( + """ + UPDATE SparkSet SET complete = 1 + WHERE blockHash = ? AND setHash = ? AND groupId = ?; + """, + [meta.blockHash, meta.setHash, meta.coinGroupId], + ); + return FCResult(success: true); +} diff --git a/lib/models/electrumx_response/spark_models.dart b/lib/models/electrumx_response/spark_models.dart index 22c6cf25fe..2bb69247ff 100644 --- a/lib/models/electrumx_response/spark_models.dart +++ b/lib/models/electrumx_response/spark_models.dart @@ -30,12 +30,14 @@ class SparkAnonymitySetMeta { final String blockHash; final String setHash; final int size; + final bool complete; SparkAnonymitySetMeta({ required this.coinGroupId, required this.blockHash, required this.setHash, required this.size, + this.complete = false, }); @override @@ -44,7 +46,8 @@ class SparkAnonymitySetMeta { "coinGroupId: $coinGroupId, " "blockHash: $blockHash, " "setHash: $setHash, " - "size: $size" + "size: $size, " + "complete: $complete" "}"; } } diff --git a/lib/wallets/wallet/wallet.dart b/lib/wallets/wallet/wallet.dart index 1aa40ef6a7..b70c852104 100644 --- a/lib/wallets/wallet/wallet.dart +++ b/lib/wallets/wallet/wallet.dart @@ -641,95 +641,105 @@ abstract class Wallet { ); } - // add some small buffer before making calls. - // this can probably be removed in the future but was added as a - // debugging feature - await Future.delayed(const Duration(milliseconds: 300)); - - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - final Set codesToCheck = {}; - if (this is PaynymInterface && !viewOnly) { - // isSegwit does not matter here at all - final myCode = await (this as PaynymInterface).getPaymentCode( - isSegwit: false, - ); + await Future(() async { + // add some small buffer before making calls. + // this can probably be removed in the future but was added as a + // debugging feature + await Future.delayed(const Duration(milliseconds: 300)); + + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + final Set codesToCheck = {}; + if (this is PaynymInterface && !viewOnly) { + // isSegwit does not matter here at all + final myCode = await (this as PaynymInterface).getPaymentCode( + isSegwit: false, + ); - final nym = await PaynymIsApi().nym(myCode.toString()); - if (nym.value != null) { - for (final follower in nym.value!.followers) { - codesToCheck.add(follower.code); - } - for (final following in nym.value!.following) { - codesToCheck.add(following.code); + final nym = await PaynymIsApi().nym(myCode.toString()); + if (nym.value != null) { + for (final follower in nym.value!.followers) { + codesToCheck.add(follower.code); + } + for (final following in nym.value!.following) { + codesToCheck.add(following.code); + } } } - } - _fireRefreshPercentChange(0); - await updateChainHeight(); + _fireRefreshPercentChange(0); + await updateChainHeight(); - if (this is BitcoinFrostWallet) { - await (this as BitcoinFrostWallet).lookAhead(); - } + if (this is BitcoinFrostWallet) { + await (this as BitcoinFrostWallet).lookAhead(); + } - _fireRefreshPercentChange(0.1); + _fireRefreshPercentChange(0.1); - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (this is MultiAddressInterface) { - if (info.otherData[WalletInfoKeys.reuseAddress] != true) { - await (this as MultiAddressInterface) - .checkReceivingAddressForTransactions(); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (this is MultiAddressInterface) { + if (info.otherData[WalletInfoKeys.reuseAddress] != true) { + await (this as MultiAddressInterface) + .checkReceivingAddressForTransactions(); + } } - } - _fireRefreshPercentChange(0.2); + _fireRefreshPercentChange(0.2); - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (this is MultiAddressInterface) { - if (info.otherData[WalletInfoKeys.reuseAddress] != true) { - await (this as MultiAddressInterface) - .checkChangeAddressForTransactions(); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (this is MultiAddressInterface) { + if (info.otherData[WalletInfoKeys.reuseAddress] != true) { + await (this as MultiAddressInterface) + .checkChangeAddressForTransactions(); + } + } + _fireRefreshPercentChange(0.3); + if (this is SparkInterface && !viewOnly) { + // this should be called before updateTransactions() + await (this as SparkInterface).refreshSparkData((0.3, 0.6)); } - } - _fireRefreshPercentChange(0.3); - if (this is SparkInterface && !viewOnly) { - // this should be called before updateTransactions() - await (this as SparkInterface).refreshSparkData((0.3, 0.6)); - } - if (this is NamecoinWallet) { - await updateUTXOs(); - _fireRefreshPercentChange(0.6); - await (this as NamecoinWallet).checkAutoRegisterNameNewOutputs(); - _fireRefreshPercentChange(0.70); - await updateTransactions(); - } else { - final fetchFuture = updateTransactions(); - _fireRefreshPercentChange(0.6); - final utxosRefreshFuture = updateUTXOs(); - _fireRefreshPercentChange(0.65); - await utxosRefreshFuture; - _fireRefreshPercentChange(0.70); - await fetchFuture; - } + if (this is NamecoinWallet) { + await updateUTXOs(); + _fireRefreshPercentChange(0.6); + await (this as NamecoinWallet).checkAutoRegisterNameNewOutputs(); + _fireRefreshPercentChange(0.70); + await updateTransactions(); + } else { + final fetchFuture = updateTransactions(); + _fireRefreshPercentChange(0.6); + final utxosRefreshFuture = updateUTXOs(); + await utxosRefreshFuture; + _fireRefreshPercentChange(0.65); + await fetchFuture; + _fireRefreshPercentChange(0.70); + } - // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. - if (!viewOnly && this is PaynymInterface && codesToCheck.isNotEmpty) { - await (this as PaynymInterface).checkForNotificationTransactionsTo( - codesToCheck, - ); - // check utxos again for notification outputs - await updateUTXOs(); - } - _fireRefreshPercentChange(0.80); + // TODO: [prio=low] handle this differently. Extra modification of this file for coin specific functionality should be avoided. + if (!viewOnly && this is PaynymInterface && codesToCheck.isNotEmpty) { + await (this as PaynymInterface).checkForNotificationTransactionsTo( + codesToCheck, + ); + // check utxos again for notification outputs + await updateUTXOs(); + } + _fireRefreshPercentChange(0.80); - // await getAllTxsToWatch(); + // await getAllTxsToWatch(); - _fireRefreshPercentChange(0.90); + _fireRefreshPercentChange(0.90); - await updateBalance(); + await updateBalance(); - _fireRefreshPercentChange(1.0); + _fireRefreshPercentChange(1.0); + }).timeout( + const Duration(minutes: 5), + onTimeout: () { + throw TimeoutException( + 'Wallet refresh timed out for $walletId', + const Duration(minutes: 5), + ); + }, + ); completer.complete(); } catch (error, strace) { diff --git a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart index 0dec8aab29..615628c751 100644 --- a/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart +++ b/lib/wallets/wallet/wallet_mixin_interfaces/spark_interface.dart @@ -1073,20 +1073,13 @@ mixin SparkInterface // missing groupIds to the list if sets to check and update final latestGroupId = await electrumXClient.getSparkLatestCoinId(); - final List groupIds = []; - if (latestGroupId > 1) { - for (int id = 1; id < latestGroupId; id++) { - final setExists = - await FiroCacheCoordinator.checkSetInfoForGroupIdExists( - id, - cryptoCurrency.network, - ); - if (!setExists) { - groupIds.add(id); - } - } - } - groupIds.add(latestGroupId); + // Process all groupIds every sync. The coordinator's early-return + // check (blockHash + size comparison) makes complete groups a no-op + // (just one meta RPC call). This ensures partially-downloaded groups + // are always completed. + final List groupIds = [ + for (int id = 1; id <= latestGroupId; id++) id, + ]; final steps = groupIds.length +