Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 61 additions & 48 deletions app/src/main/java/to/bitkit/repositories/HealthRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import org.lightningdevkit.ldknode.ChannelDetails
import to.bitkit.data.CacheStore
import to.bitkit.di.BgDispatcher
import to.bitkit.models.BackupCategory
Expand Down Expand Up @@ -43,68 +44,80 @@ class HealthRepo @Inject constructor(
observeBackupStatus()
}

@Suppress("CyclomaticComplexMethod")
private fun collectState() {
val internetHealthState = connectivityRepo.isOnline.map { connectivityState ->
when (connectivityState) {
ConnectivityState.CONNECTED -> HealthState.READY
ConnectivityState.CONNECTING -> HealthState.PENDING
ConnectivityState.DISCONNECTED -> HealthState.ERROR
}
}
val internetHealthState = connectivityRepo.isOnline.map { it.asHealth() }

repoScope.launch {
combine(
internetHealthState,
lightningRepo.lightningState,
) { internetHealth, lightningState ->
val isOnline = internetHealth == HealthState.READY
val nodeLifecycleState = lightningState.nodeLifecycleState

val nodeHealth = when {
!isOnline -> HealthState.ERROR
else -> nodeLifecycleState.asHealth()
}

val electrumHealth = when {
!isOnline -> HealthState.ERROR
nodeLifecycleState.isRunning() -> HealthState.READY
nodeLifecycleState.canRun() -> HealthState.PENDING
else -> HealthState.ERROR
}

val channelsHealth = when {
!isOnline -> HealthState.ERROR
else -> {
val channels = lightningState.channels
val hasOpenChannels = channels.any { it.isChannelReady }
val hasPendingChannels = channels.any { !it.isChannelReady }

when {
hasOpenChannels -> HealthState.READY
hasPendingChannels -> HealthState.PENDING
else -> HealthState.ERROR
}
}
}

AppHealthState(
internet = internetHealth,
electrum = electrumHealth,
node = nodeHealth,
channels = channelsHealth,
)
computeHealthState(internetHealth, lightningState)
}.collect { newHealthState ->
updateState { currentState ->
newHealthState.copy(
backups = currentState.backups,
app = currentState.app,
updateState {
it.copy(
internet = newHealthState.internet,
electrum = newHealthState.electrum,
node = newHealthState.node,
channels = newHealthState.channels,
)
}
}
}
}

@Suppress("CyclomaticComplexMethod")
private fun computeHealthState(internetHealth: HealthState, lightningState: LightningState): AppHealthState {
val isOnline = internetHealth == HealthState.READY
val nodeLifecycleState = lightningState.nodeLifecycleState
val isSyncing = lightningState.isSyncingWallet
val hasSyncError = lightningState.lastSyncError != null

val nodeHealth = when {
!isOnline -> HealthState.ERROR
isSyncing -> HealthState.PENDING
hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR
else -> nodeLifecycleState.asHealth()
}

val electrumHealth = when {
!isOnline -> HealthState.ERROR
isSyncing -> HealthState.PENDING
hasSyncError && nodeLifecycleState.isRunning() -> HealthState.ERROR
nodeLifecycleState.isRunning() -> HealthState.READY
nodeLifecycleState.canRun() -> HealthState.PENDING
else -> HealthState.ERROR
}

val channelsHealth = when {
!isOnline -> HealthState.ERROR
else -> computeChannelsHealth(lightningState.channels)
}

return AppHealthState(
internet = internetHealth,
electrum = electrumHealth,
node = nodeHealth,
channels = channelsHealth,
)
}

private fun computeChannelsHealth(channels: List<ChannelDetails>): HealthState {
val hasOpenChannels = channels.any { it.isChannelReady }
val hasPendingChannels = channels.any { !it.isChannelReady }
return when {
hasOpenChannels -> HealthState.READY
hasPendingChannels -> HealthState.PENDING
else -> HealthState.ERROR
}
}

private fun ConnectivityState.asHealth() = when (this) {
ConnectivityState.CONNECTED -> HealthState.READY
ConnectivityState.CONNECTING -> HealthState.PENDING
ConnectivityState.DISCONNECTED -> HealthState.ERROR
}

private fun observePaidOrdersState() {
repoScope.launch {
blocktankRepo.blocktankState.map { it.paidOrders }.distinctUntilChanged().collect { paidOrders ->
Expand Down
110 changes: 105 additions & 5 deletions app/src/main/java/to/bitkit/repositories/LightningRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ import com.synonym.bitkitcore.createWithdrawCallbackUrl
import com.synonym.bitkitcore.lnurlAuth
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.tasks.await
Expand Down Expand Up @@ -64,6 +68,7 @@ import to.bitkit.utils.Logger
import to.bitkit.utils.ServiceError
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.coroutines.cancellation.CancellationException
Expand All @@ -84,6 +89,7 @@ class LightningRepo @Inject constructor(
private val lnurlService: LnurlService,
private val cacheStore: CacheStore,
private val preActivityMetadataRepo: PreActivityMetadataRepo,
private val connectivityRepo: ConnectivityRepo,
) {
private val _lightningState = MutableStateFlow(LightningState())
val lightningState = _lightningState.asStateFlow()
Expand All @@ -101,6 +107,66 @@ class LightningRepo @Inject constructor(

private val syncMutex = Mutex()
private val syncPending = AtomicBoolean(false)
private val syncRetryJob = AtomicReference<Job?>(null)

init {
observeConnectivityForSyncRetry()
}

private fun observeConnectivityForSyncRetry() {
scope.launch {
connectivityRepo.isOnline
.map { it == ConnectivityState.CONNECTED }
.distinctUntilChanged()
.collect { isConnected ->
if (!isConnected) {
// Cancel any pending retry when disconnected
syncRetryJob.getAndSet(null)?.cancel()
return@collect
}

// Start retry loop if sync is failing
startSyncRetryLoopIfNeeded()
}
}
}

private fun startSyncRetryLoopIfNeeded() {
val state = _lightningState.value
if (!state.nodeLifecycleState.isRunning() || state.lastSyncError == null) {
return
}

// Don't start if already retrying
if (syncRetryJob.get()?.isActive == true) {
return
}

val job = scope.launch {
// Don't start retry loop if offline
if (connectivityRepo.isOnline.first() != ConnectivityState.CONNECTED) {
return@launch
}

while (isActive) {
val currentState = _lightningState.value
// Stop if no longer running or sync is now healthy
if (!currentState.nodeLifecycleState.isRunning() || currentState.isSyncHealthy) {
Logger.debug("Sync retry loop stopped: node not running or sync healthy", context = TAG)
break
}

delay(SYNC_RETRY_DELAY_MS)
Logger.info("Retrying sync after failure", context = TAG)
sync().onSuccess {
Logger.info("Sync retry succeeded", context = TAG)
}.onFailure {
Logger.warn("Sync retry failed, will retry in ${SYNC_RETRY_DELAY_MS / 1000}s", it, context = TAG)
}
}
}
syncRetryJob.set(job)
}

/**
* Executes the provided operation only if the node is running.
Expand Down Expand Up @@ -318,6 +384,7 @@ class LightningRepo @Inject constructor(
}
}

@Suppress("TooGenericExceptionCaught")
suspend fun sync(): Result<Unit> = executeWhenNodeRunning("sync") {
// If sync is in progress, mark pending and skip
if (!syncMutex.tryLock()) {
Expand All @@ -326,21 +393,28 @@ class LightningRepo @Inject constructor(
return@executeWhenNodeRunning Result.success(Unit)
}

try {
runCatching {
do {
syncPending.set(false)
_lightningState.update { it.copy(isSyncingWallet = true) }
lightningService.sync()
refreshChannelCache()
syncState()
_lightningState.update {
it.copy(
lastSyncError = null,
lastSuccessfulSyncAt = System.currentTimeMillis(),
)
}
if (syncPending.get()) delay(MS_SYNC_LOOP_DEBOUNCE)
} while (syncPending.getAndSet(false))
} finally {
_lightningState.update { it.copy(isSyncingWallet = false) }
}.also {
_lightningState.update { state -> state.copy(isSyncingWallet = false) }
syncMutex.unlock()
}.onFailure {
_lightningState.update { state -> state.copy(lastSyncError = it) }
startSyncRetryLoopIfNeeded()
}

Result.success(Unit)
}

fun syncAsync() = scope.launch {
Expand All @@ -349,6 +423,16 @@ class LightningRepo @Inject constructor(
}
}

private suspend fun ensureSyncedBeforeSend(): Result<Unit> {
Logger.debug("Ensuring wallet is synced before send", context = TAG)
return sync().fold(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Did you notice what the docs say about node.syncWallets?!

https://docs.rs/ldk-node/latest/ldk_node/struct.Node.html#method.sync_wallets

Just wanting to make sure this was part of the equation. Maybe the docs are wrong, and we need to sync at certain points to ensure correctness. I wouldn't be surprised.

onSuccess = { Result.success(Unit) },
onFailure = {
Result.failure(SyncUnhealthyError())
},
)
}

/** Clear pending sync flag. Called when manual pull-to-refresh takes priority. */
fun clearPendingSync() = syncPending.set(false)

Expand Down Expand Up @@ -634,6 +718,11 @@ class LightningRepo @Inject constructor(
): Result<Txid> = executeWhenNodeRunning("sendOnChain") {
require(address.isNotEmpty()) { "Send address cannot be empty" }

// Ensure wallet is synced before sending to have up-to-date state
ensureSyncedBeforeSend().onFailure {
return@executeWhenNodeRunning Result.failure(it)
}

val transactionSpeed = speed ?: settingsStore.data.first().defaultTransactionSpeed
val satsPerVByte = getFeeRateForSpeed(transactionSpeed, feeRates).getOrThrow()

Expand Down Expand Up @@ -960,6 +1049,7 @@ class LightningRepo @Inject constructor(
private const val TAG = "LightningRepo"
private const val LENGTH_CHANNEL_ID_PREVIEW = 10
private const val MS_SYNC_LOOP_DEBOUNCE = 500L
private const val SYNC_RETRY_DELAY_MS = 15_000L
}
}

Expand All @@ -968,6 +1058,7 @@ class NodeSetupError : AppError("Unknown node setup error")
class NodeStopTimeoutError : AppError("Timeout waiting for node to stop")
class NodeRunTimeoutError(opName: String) : AppError("Timeout waiting for node to run and execute: '$opName'")
class GetPaymentsError : AppError("It wasn't possible get the payments")
class SyncUnhealthyError : AppError("Wallet sync failed before send")

data class LightningState(
val nodeId: String = "",
Expand All @@ -978,6 +1069,15 @@ data class LightningState(
val balances: BalanceDetails? = null,
val isSyncingWallet: Boolean = false,
val isGeoBlocked: Boolean = false,
val lastSyncError: Throwable? = null,
val lastSuccessfulSyncAt: Long? = null,
) {
fun block(): BestBlock? = nodeStatus?.currentBestBlock

/**
* Returns true if the node has synced successfully at least once and the last sync didn't fail.
* This is used to determine if critical operations like sending should be allowed.
*/
val isSyncHealthy: Boolean
get() = lastSyncError == null && lastSuccessfulSyncAt != null
}
2 changes: 1 addition & 1 deletion app/src/main/java/to/bitkit/repositories/WalletRepo.kt
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class WalletRepo @Inject constructor(
eventSyncJob?.cancel()
eventSyncJob = repoScope.launch {
delay(EVENT_SYNC_DEBOUNCE_MS)
syncNodeAndWallet()
syncBalances()
transferRepo.syncTransferStates()
}
}
Expand Down
Loading
Loading