-
Notifications
You must be signed in to change notification settings - Fork 2
Handle node connection issues #717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,14 +11,18 @@ import com.synonym.bitkitcore.createWithdrawCallbackUrl | |
| import com.synonym.bitkitcore.lnurlAuth | ||
| import kotlinx.coroutines.CoroutineDispatcher | ||
| import kotlinx.coroutines.CoroutineScope | ||
| import kotlinx.coroutines.Job | ||
| import kotlinx.coroutines.SupervisorJob | ||
| import kotlinx.coroutines.delay | ||
| import kotlinx.coroutines.flow.MutableSharedFlow | ||
| import kotlinx.coroutines.flow.MutableStateFlow | ||
| import kotlinx.coroutines.flow.asSharedFlow | ||
| import kotlinx.coroutines.flow.asStateFlow | ||
| import kotlinx.coroutines.flow.distinctUntilChanged | ||
| import kotlinx.coroutines.flow.first | ||
| import kotlinx.coroutines.flow.map | ||
| import kotlinx.coroutines.flow.update | ||
| import kotlinx.coroutines.isActive | ||
| import kotlinx.coroutines.launch | ||
| import kotlinx.coroutines.sync.Mutex | ||
| import kotlinx.coroutines.tasks.await | ||
|
|
@@ -64,6 +68,7 @@ import to.bitkit.utils.Logger | |
| import to.bitkit.utils.ServiceError | ||
| import java.util.concurrent.ConcurrentHashMap | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
| import java.util.concurrent.atomic.AtomicReference | ||
| import javax.inject.Inject | ||
| import javax.inject.Singleton | ||
| import kotlin.coroutines.cancellation.CancellationException | ||
|
|
@@ -84,6 +89,7 @@ class LightningRepo @Inject constructor( | |
| private val lnurlService: LnurlService, | ||
| private val cacheStore: CacheStore, | ||
| private val preActivityMetadataRepo: PreActivityMetadataRepo, | ||
| private val connectivityRepo: ConnectivityRepo, | ||
| ) { | ||
| private val _lightningState = MutableStateFlow(LightningState()) | ||
| val lightningState = _lightningState.asStateFlow() | ||
|
|
@@ -101,6 +107,66 @@ class LightningRepo @Inject constructor( | |
|
|
||
| private val syncMutex = Mutex() | ||
| private val syncPending = AtomicBoolean(false) | ||
| private val syncRetryJob = AtomicReference<Job?>(null) | ||
|
|
||
| init { | ||
| observeConnectivityForSyncRetry() | ||
| } | ||
|
|
||
| private fun observeConnectivityForSyncRetry() { | ||
| scope.launch { | ||
| connectivityRepo.isOnline | ||
| .map { it == ConnectivityState.CONNECTED } | ||
| .distinctUntilChanged() | ||
| .collect { isConnected -> | ||
| if (!isConnected) { | ||
| // Cancel any pending retry when disconnected | ||
| syncRetryJob.getAndSet(null)?.cancel() | ||
| return@collect | ||
| } | ||
|
|
||
| // Start retry loop if sync is failing | ||
| startSyncRetryLoopIfNeeded() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private fun startSyncRetryLoopIfNeeded() { | ||
| val state = _lightningState.value | ||
| if (!state.nodeLifecycleState.isRunning() || state.lastSyncError == null) { | ||
| return | ||
| } | ||
|
|
||
| // Don't start if already retrying | ||
| if (syncRetryJob.get()?.isActive == true) { | ||
| return | ||
| } | ||
|
|
||
| val job = scope.launch { | ||
| // Don't start retry loop if offline | ||
| if (connectivityRepo.isOnline.first() != ConnectivityState.CONNECTED) { | ||
| return@launch | ||
| } | ||
|
|
||
| while (isActive) { | ||
| val currentState = _lightningState.value | ||
| // Stop if no longer running or sync is now healthy | ||
| if (!currentState.nodeLifecycleState.isRunning() || currentState.isSyncHealthy) { | ||
| Logger.debug("Sync retry loop stopped: node not running or sync healthy", context = TAG) | ||
| break | ||
| } | ||
|
|
||
| delay(SYNC_RETRY_DELAY_MS) | ||
| Logger.info("Retrying sync after failure", context = TAG) | ||
| sync().onSuccess { | ||
| Logger.info("Sync retry succeeded", context = TAG) | ||
| }.onFailure { | ||
| Logger.warn("Sync retry failed, will retry in ${SYNC_RETRY_DELAY_MS / 1000}s", it, context = TAG) | ||
| } | ||
| } | ||
| } | ||
| syncRetryJob.set(job) | ||
| } | ||
|
|
||
| /** | ||
| * Executes the provided operation only if the node is running. | ||
|
|
@@ -318,6 +384,7 @@ class LightningRepo @Inject constructor( | |
| } | ||
| } | ||
|
|
||
| @Suppress("TooGenericExceptionCaught") | ||
| suspend fun sync(): Result<Unit> = executeWhenNodeRunning("sync") { | ||
| // If sync is in progress, mark pending and skip | ||
| if (!syncMutex.tryLock()) { | ||
|
|
@@ -326,21 +393,28 @@ class LightningRepo @Inject constructor( | |
| return@executeWhenNodeRunning Result.success(Unit) | ||
| } | ||
|
|
||
| try { | ||
| runCatching { | ||
| do { | ||
| syncPending.set(false) | ||
| _lightningState.update { it.copy(isSyncingWallet = true) } | ||
| lightningService.sync() | ||
| refreshChannelCache() | ||
| syncState() | ||
| _lightningState.update { | ||
| it.copy( | ||
| lastSyncError = null, | ||
| lastSuccessfulSyncAt = System.currentTimeMillis(), | ||
| ) | ||
| } | ||
| if (syncPending.get()) delay(MS_SYNC_LOOP_DEBOUNCE) | ||
| } while (syncPending.getAndSet(false)) | ||
| } finally { | ||
| _lightningState.update { it.copy(isSyncingWallet = false) } | ||
| }.also { | ||
| _lightningState.update { state -> state.copy(isSyncingWallet = false) } | ||
| syncMutex.unlock() | ||
| }.onFailure { | ||
| _lightningState.update { state -> state.copy(lastSyncError = it) } | ||
| startSyncRetryLoopIfNeeded() | ||
| } | ||
|
|
||
| Result.success(Unit) | ||
| } | ||
|
|
||
| fun syncAsync() = scope.launch { | ||
|
|
@@ -349,6 +423,16 @@ class LightningRepo @Inject constructor( | |
| } | ||
| } | ||
|
|
||
| private suspend fun ensureSyncedBeforeSend(): Result<Unit> { | ||
| Logger.debug("Ensuring wallet is synced before send", context = TAG) | ||
| return sync().fold( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Did you notice what the docs say about https://docs.rs/ldk-node/latest/ldk_node/struct.Node.html#method.sync_wallets Just wanting to make sure this was part of the equation. Maybe the docs are wrong, and we need to sync at certain points to ensure correctness. I wouldn't be surprised. |
||
| onSuccess = { Result.success(Unit) }, | ||
| onFailure = { | ||
| Result.failure(SyncUnhealthyError()) | ||
| }, | ||
| ) | ||
| } | ||
|
|
||
| /** Clear pending sync flag. Called when manual pull-to-refresh takes priority. */ | ||
| fun clearPendingSync() = syncPending.set(false) | ||
|
|
||
|
|
@@ -634,6 +718,11 @@ class LightningRepo @Inject constructor( | |
| ): Result<Txid> = executeWhenNodeRunning("sendOnChain") { | ||
| require(address.isNotEmpty()) { "Send address cannot be empty" } | ||
|
|
||
| // Ensure wallet is synced before sending to have up-to-date state | ||
| ensureSyncedBeforeSend().onFailure { | ||
| return@executeWhenNodeRunning Result.failure(it) | ||
| } | ||
|
|
||
| val transactionSpeed = speed ?: settingsStore.data.first().defaultTransactionSpeed | ||
| val satsPerVByte = getFeeRateForSpeed(transactionSpeed, feeRates).getOrThrow() | ||
|
|
||
|
|
@@ -960,6 +1049,7 @@ class LightningRepo @Inject constructor( | |
| private const val TAG = "LightningRepo" | ||
| private const val LENGTH_CHANNEL_ID_PREVIEW = 10 | ||
| private const val MS_SYNC_LOOP_DEBOUNCE = 500L | ||
| private const val SYNC_RETRY_DELAY_MS = 15_000L | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -968,6 +1058,7 @@ class NodeSetupError : AppError("Unknown node setup error") | |
| class NodeStopTimeoutError : AppError("Timeout waiting for node to stop") | ||
| class NodeRunTimeoutError(opName: String) : AppError("Timeout waiting for node to run and execute: '$opName'") | ||
| class GetPaymentsError : AppError("It wasn't possible get the payments") | ||
| class SyncUnhealthyError : AppError("Wallet sync failed before send") | ||
|
|
||
| data class LightningState( | ||
| val nodeId: String = "", | ||
|
|
@@ -978,6 +1069,15 @@ data class LightningState( | |
| val balances: BalanceDetails? = null, | ||
| val isSyncingWallet: Boolean = false, | ||
| val isGeoBlocked: Boolean = false, | ||
| val lastSyncError: Throwable? = null, | ||
| val lastSuccessfulSyncAt: Long? = null, | ||
| ) { | ||
| fun block(): BestBlock? = nodeStatus?.currentBestBlock | ||
|
|
||
| /** | ||
| * Returns true if the node has synced successfully at least once and the last sync didn't fail. | ||
| * This is used to determine if critical operations like sending should be allowed. | ||
| */ | ||
| val isSyncHealthy: Boolean | ||
| get() = lastSyncError == null && lastSuccessfulSyncAt != null | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.