From 2c5e979f69076fc8dd3a6774ef9d9440dc7fb549 Mon Sep 17 00:00:00 2001 From: benk10 Date: Thu, 22 Jan 2026 17:43:22 -0500 Subject: [PATCH] fix: handle node connection issues --- .../java/to/bitkit/repositories/HealthRepo.kt | 109 +++++++++-------- .../to/bitkit/repositories/LightningRepo.kt | 110 +++++++++++++++++- .../java/to/bitkit/repositories/WalletRepo.kt | 2 +- .../to/bitkit/repositories/HealthRepoTest.kt | 47 +++++++- .../bitkit/repositories/LightningRepoTest.kt | 29 +++++ 5 files changed, 241 insertions(+), 56 deletions(-) diff --git a/app/src/main/java/to/bitkit/repositories/HealthRepo.kt b/app/src/main/java/to/bitkit/repositories/HealthRepo.kt index ba81e5f56..42c665cb5 100644 --- a/app/src/main/java/to/bitkit/repositories/HealthRepo.kt +++ b/app/src/main/java/to/bitkit/repositories/HealthRepo.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch +import org.lightningdevkit.ldknode.ChannelDetails import to.bitkit.data.CacheStore import to.bitkit.di.BgDispatcher import to.bitkit.models.BackupCategory @@ -43,68 +44,80 @@ class HealthRepo @Inject constructor( observeBackupStatus() } - @Suppress("CyclomaticComplexMethod") private fun collectState() { - val internetHealthState = connectivityRepo.isOnline.map { connectivityState -> - when (connectivityState) { - ConnectivityState.CONNECTED -> HealthState.READY - ConnectivityState.CONNECTING -> HealthState.PENDING - ConnectivityState.DISCONNECTED -> HealthState.ERROR - } - } + val internetHealthState = connectivityRepo.isOnline.map { it.asHealth() } repoScope.launch { combine( internetHealthState, lightningRepo.lightningState, ) { internetHealth, lightningState -> - val isOnline = internetHealth == HealthState.READY - val nodeLifecycleState = lightningState.nodeLifecycleState - - val nodeHealth = when { - !isOnline -> HealthState.ERROR - else -> nodeLifecycleState.asHealth() - } - - val electrumHealth = when { - !isOnline -> HealthState.ERROR - nodeLifecycleState.isRunning() -> HealthState.READY - nodeLifecycleState.canRun() -> HealthState.PENDING - else -> HealthState.ERROR - } - - val channelsHealth = when { - !isOnline -> HealthState.ERROR - else -> { - val channels = lightningState.channels - val hasOpenChannels = channels.any { it.isChannelReady } - val hasPendingChannels = channels.any { !it.isChannelReady } - - when { - hasOpenChannels -> HealthState.READY - hasPendingChannels -> HealthState.PENDING - else -> HealthState.ERROR - } - } - } - - AppHealthState( - internet = internetHealth, - electrum = electrumHealth, - node = nodeHealth, - channels = channelsHealth, - ) + computeHealthState(internetHealth, lightningState) }.collect { newHealthState -> - updateState { currentState -> - newHealthState.copy( - backups = currentState.backups, - app = currentState.app, + updateState { + it.copy( + internet = newHealthState.internet, + electrum = newHealthState.electrum, + node = newHealthState.node, + channels = newHealthState.channels, ) } } } } + @Suppress("CyclomaticComplexMethod") + private fun computeHealthState(internetHealth: HealthState, lightningState: LightningState): AppHealthState { + val isOnline = internetHealth == HealthState.READY + val nodeLifecycleState = lightningState.nodeLifecycleState + val isSyncing = lightningState.isSyncingWallet + val hasSyncError = lightningState.lastSyncError != null + + val nodeHealth = when { + !isOnline -> HealthState.ERROR + isSyncing -> HealthState.PENDING + hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR + else -> nodeLifecycleState.asHealth() + } + + val electrumHealth = when { + !isOnline -> HealthState.ERROR + isSyncing -> HealthState.PENDING + hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR + nodeLifecycleState.isRunning() -> HealthState.READY + nodeLifecycleState.canRun() -> HealthState.PENDING + else -> HealthState.ERROR + } + + val channelsHealth = when { + !isOnline -> HealthState.ERROR + else -> computeChannelsHealth(lightningState.channels) + } + + return AppHealthState( + internet = internetHealth, + electrum = electrumHealth, + node = nodeHealth, + channels = channelsHealth, + ) + } + + private fun computeChannelsHealth(channels: List): HealthState { + val hasOpenChannels = channels.any { it.isChannelReady } + val hasPendingChannels = channels.any { !it.isChannelReady } + return when { + hasOpenChannels -> HealthState.READY + hasPendingChannels -> HealthState.PENDING + else -> HealthState.ERROR + } + } + + private fun ConnectivityState.asHealth() = when (this) { + ConnectivityState.CONNECTED -> HealthState.READY + ConnectivityState.CONNECTING -> HealthState.PENDING + ConnectivityState.DISCONNECTED -> HealthState.ERROR + } + private fun observePaidOrdersState() { repoScope.launch { blocktankRepo.blocktankState.map { it.paidOrders }.distinctUntilChanged().collect { paidOrders -> diff --git a/app/src/main/java/to/bitkit/repositories/LightningRepo.kt b/app/src/main/java/to/bitkit/repositories/LightningRepo.kt index 7cdd9ca55..f996b6262 100644 --- a/app/src/main/java/to/bitkit/repositories/LightningRepo.kt +++ b/app/src/main/java/to/bitkit/repositories/LightningRepo.kt @@ -11,14 +11,18 @@ import com.synonym.bitkitcore.createWithdrawCallbackUrl import com.synonym.bitkitcore.lnurlAuth import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.update +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.tasks.await @@ -64,6 +68,7 @@ import to.bitkit.utils.Logger import to.bitkit.utils.ServiceError import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import javax.inject.Inject import javax.inject.Singleton import kotlin.coroutines.cancellation.CancellationException @@ -84,6 +89,7 @@ class LightningRepo @Inject constructor( private val lnurlService: LnurlService, private val cacheStore: CacheStore, private val preActivityMetadataRepo: PreActivityMetadataRepo, + private val connectivityRepo: ConnectivityRepo, ) { private val _lightningState = MutableStateFlow(LightningState()) val lightningState = _lightningState.asStateFlow() @@ -101,6 +107,66 @@ class LightningRepo @Inject constructor( private val syncMutex = Mutex() private val syncPending = AtomicBoolean(false) + private val syncRetryJob = AtomicReference(null) + + init { + observeConnectivityForSyncRetry() + } + + private fun observeConnectivityForSyncRetry() { + scope.launch { + connectivityRepo.isOnline + .map { it == ConnectivityState.CONNECTED } + .distinctUntilChanged() + .collect { isConnected -> + if (!isConnected) { + // Cancel any pending retry when disconnected + syncRetryJob.getAndSet(null)?.cancel() + return@collect + } + + // Start retry loop if sync is failing + startSyncRetryLoopIfNeeded() + } + } + } + + private fun startSyncRetryLoopIfNeeded() { + val state = _lightningState.value + if (!state.nodeLifecycleState.isRunning() || state.lastSyncError == null) { + return + } + + // Don't start if already retrying + if (syncRetryJob.get()?.isActive == true) { + return + } + + val job = scope.launch { + // Don't start retry loop if offline + if (connectivityRepo.isOnline.first() != ConnectivityState.CONNECTED) { + return@launch + } + + while (isActive) { + val currentState = _lightningState.value + // Stop if no longer running or sync is now healthy + if (!currentState.nodeLifecycleState.isRunning() || currentState.isSyncHealthy) { + Logger.debug("Sync retry loop stopped: node not running or sync healthy", context = TAG) + break + } + + delay(SYNC_RETRY_DELAY_MS) + Logger.info("Retrying sync after failure", context = TAG) + sync().onSuccess { + Logger.info("Sync retry succeeded", context = TAG) + }.onFailure { + Logger.warn("Sync retry failed, will retry in ${SYNC_RETRY_DELAY_MS / 1000}s", it, context = TAG) + } + } + } + syncRetryJob.set(job) + } /** * Executes the provided operation only if the node is running. @@ -318,6 +384,7 @@ class LightningRepo @Inject constructor( } } + @Suppress("TooGenericExceptionCaught") suspend fun sync(): Result = executeWhenNodeRunning("sync") { // If sync is in progress, mark pending and skip if (!syncMutex.tryLock()) { @@ -326,21 +393,28 @@ class LightningRepo @Inject constructor( return@executeWhenNodeRunning Result.success(Unit) } - try { + runCatching { do { syncPending.set(false) _lightningState.update { it.copy(isSyncingWallet = true) } lightningService.sync() refreshChannelCache() syncState() + _lightningState.update { + it.copy( + lastSyncError = null, + lastSuccessfulSyncAt = System.currentTimeMillis(), + ) + } if (syncPending.get()) delay(MS_SYNC_LOOP_DEBOUNCE) } while (syncPending.getAndSet(false)) - } finally { - _lightningState.update { it.copy(isSyncingWallet = false) } + }.also { + _lightningState.update { state -> state.copy(isSyncingWallet = false) } syncMutex.unlock() + }.onFailure { + _lightningState.update { state -> state.copy(lastSyncError = it) } + startSyncRetryLoopIfNeeded() } - - Result.success(Unit) } fun syncAsync() = scope.launch { @@ -349,6 +423,16 @@ class LightningRepo @Inject constructor( } } + private suspend fun ensureSyncedBeforeSend(): Result { + Logger.debug("Ensuring wallet is synced before send", context = TAG) + return sync().fold( + onSuccess = { Result.success(Unit) }, + onFailure = { + Result.failure(SyncUnhealthyError()) + }, + ) + } + /** Clear pending sync flag. Called when manual pull-to-refresh takes priority. */ fun clearPendingSync() = syncPending.set(false) @@ -634,6 +718,11 @@ class LightningRepo @Inject constructor( ): Result = executeWhenNodeRunning("sendOnChain") { require(address.isNotEmpty()) { "Send address cannot be empty" } + // Ensure wallet is synced before sending to have up-to-date state + ensureSyncedBeforeSend().onFailure { + return@executeWhenNodeRunning Result.failure(it) + } + val transactionSpeed = speed ?: settingsStore.data.first().defaultTransactionSpeed val satsPerVByte = getFeeRateForSpeed(transactionSpeed, feeRates).getOrThrow() @@ -960,6 +1049,7 @@ class LightningRepo @Inject constructor( private const val TAG = "LightningRepo" private const val LENGTH_CHANNEL_ID_PREVIEW = 10 private const val MS_SYNC_LOOP_DEBOUNCE = 500L + private const val SYNC_RETRY_DELAY_MS = 15_000L } } @@ -968,6 +1058,7 @@ class NodeSetupError : AppError("Unknown node setup error") class NodeStopTimeoutError : AppError("Timeout waiting for node to stop") class NodeRunTimeoutError(opName: String) : AppError("Timeout waiting for node to run and execute: '$opName'") class GetPaymentsError : AppError("It wasn't possible get the payments") +class SyncUnhealthyError : AppError("Wallet sync failed before send") data class LightningState( val nodeId: String = "", @@ -978,6 +1069,15 @@ data class LightningState( val balances: BalanceDetails? = null, val isSyncingWallet: Boolean = false, val isGeoBlocked: Boolean = false, + val lastSyncError: Throwable? = null, + val lastSuccessfulSyncAt: Long? = null, ) { fun block(): BestBlock? = nodeStatus?.currentBestBlock + + /** + * Returns true if the node has synced successfully at least once and the last sync didn't fail. + * This is used to determine if critical operations like sending should be allowed. + */ + val isSyncHealthy: Boolean + get() = lastSyncError == null && lastSuccessfulSyncAt != null } diff --git a/app/src/main/java/to/bitkit/repositories/WalletRepo.kt b/app/src/main/java/to/bitkit/repositories/WalletRepo.kt index 5074005d4..fe3c77512 100644 --- a/app/src/main/java/to/bitkit/repositories/WalletRepo.kt +++ b/app/src/main/java/to/bitkit/repositories/WalletRepo.kt @@ -202,7 +202,7 @@ class WalletRepo @Inject constructor( eventSyncJob?.cancel() eventSyncJob = repoScope.launch { delay(EVENT_SYNC_DEBOUNCE_MS) - syncNodeAndWallet() + syncBalances() transferRepo.syncTransferStates() } } diff --git a/app/src/test/java/to/bitkit/repositories/HealthRepoTest.kt b/app/src/test/java/to/bitkit/repositories/HealthRepoTest.kt index f0ecfef10..97b87f617 100644 --- a/app/src/test/java/to/bitkit/repositories/HealthRepoTest.kt +++ b/app/src/test/java/to/bitkit/repositories/HealthRepoTest.kt @@ -227,7 +227,10 @@ class HealthRepoTest : BaseUnitTest() { @Test fun `lightning node state maps correctly when online`() = test { - val lightningState = LightningState(nodeLifecycleState = NodeLifecycleState.Running) + val lightningState = LightningState( + nodeLifecycleState = NodeLifecycleState.Running, + lastSuccessfulSyncAt = System.currentTimeMillis(), + ) whenever(lightningRepo.lightningState).thenReturn(MutableStateFlow(lightningState)) sut = createSut() @@ -282,7 +285,10 @@ class HealthRepoTest : BaseUnitTest() { val connectivityFlow = MutableStateFlow(ConnectivityState.CONNECTED) whenever(connectivityRepo.isOnline).thenReturn(connectivityFlow) - val lightningState = LightningState(nodeLifecycleState = NodeLifecycleState.Running) + val lightningState = LightningState( + nodeLifecycleState = NodeLifecycleState.Running, + lastSuccessfulSyncAt = System.currentTimeMillis(), + ) whenever(lightningRepo.lightningState).thenReturn(MutableStateFlow(lightningState)) whenever(cacheStore.backupStatuses).thenReturn(flowOf(emptyMap())) @@ -322,6 +328,43 @@ class HealthRepoTest : BaseUnitTest() { } } + @Test + fun `node and electrum are error when sync fails`() = test { + val lightningState = LightningState( + nodeLifecycleState = NodeLifecycleState.Running, + lastSyncError = RuntimeException("Sync failed"), + lastSuccessfulSyncAt = null, + ) + whenever(lightningRepo.lightningState).thenReturn(MutableStateFlow(lightningState)) + + sut = createSut() + + sut.healthState.test { + val state = awaitItem() + assertEquals(HealthState.ERROR, state.node) + assertEquals(HealthState.ERROR, state.electrum) + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `node and electrum are pending when syncing`() = test { + val lightningState = LightningState( + nodeLifecycleState = NodeLifecycleState.Running, + isSyncingWallet = true, + ) + whenever(lightningRepo.lightningState).thenReturn(MutableStateFlow(lightningState)) + + sut = createSut() + + sut.healthState.test { + val state = awaitItem() + assertEquals(HealthState.PENDING, state.node) + assertEquals(HealthState.PENDING, state.electrum) + cancelAndIgnoreRemainingEvents() + } + } + @Test fun `isOnline returns true when internet is ready`() = test { sut = createSut() diff --git a/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt b/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt index 6a3695570..dc797b8b5 100644 --- a/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt +++ b/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt @@ -63,10 +63,12 @@ class LightningRepoTest : BaseUnitTest() { private val cacheStore = mock() private val preActivityMetadataRepo = mock() private val lnurlService = mock() + private val connectivityRepo = mock() @Before fun setUp() = runBlocking { whenever(coreService.isGeoBlocked()).thenReturn(false) + whenever(connectivityRepo.isOnline).thenReturn(flowOf(ConnectivityState.CONNECTED)) sut = LightningRepo( bgDispatcher = testDispatcher, lightningService = lightningService, @@ -78,6 +80,7 @@ class LightningRepoTest : BaseUnitTest() { lnurlService = lnurlService, cacheStore = cacheStore, preActivityMetadataRepo = preActivityMetadataRepo, + connectivityRepo = connectivityRepo, ) } @@ -86,12 +89,15 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.node).thenReturn(mock()) whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.sync()).thenReturn(Unit) whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) val blocktank = mock() whenever(coreService.blocktank).thenReturn(blocktank) whenever(blocktank.info(any())).thenReturn(null) val result = sut.start() assertTrue(result.isSuccess) + // Simulate successful sync to set isSyncHealthy = true + sut.sync() } @Test @@ -372,6 +378,29 @@ class LightningRepoTest : BaseUnitTest() { assertTrue(result.isFailure) } + @Test + fun `sendOnChain should fail when sync is unhealthy`() = test { + // Start node but make sync fail (isSyncHealthy = false) + // Mock connectivity as disconnected to prevent retry loop from running indefinitely + whenever(connectivityRepo.isOnline).thenReturn(flowOf(ConnectivityState.DISCONNECTED)) + sut.setInitNodeLifecycleState() + whenever(lightningService.node).thenReturn(mock()) + whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) + whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.sync()).thenThrow(RuntimeException("Sync failed")) + whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) + val blocktank = mock() + whenever(coreService.blocktank).thenReturn(blocktank) + whenever(blocktank.info(any())).thenReturn(null) + sut.start() + + // Sync failed during start(), so isSyncHealthy should be false + + val result = sut.sendOnChain("address", 1000uL) + assertTrue(result.isFailure) + assertTrue(result.exceptionOrNull() is SyncUnhealthyError) + } + @Test fun `sendOnChain should cache activity meta data`() = test { val mockSettingsData = SettingsData(