Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 74 additions & 61 deletions lightning-liquidity/src/lsps5/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,84 +245,97 @@ where
// introduce some batching to upper-bound the number of requests inflight at any given
// time.

let mut did_persist = false;

if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(did_persist);
return Ok(false);
}

let mut did_persist = false;

loop {
let mut need_remove = Vec::new();
let mut need_persist = Vec::new();
match self.do_persist().await {
Ok(pass_did_persist) => did_persist |= pass_did_persist,
Err(e) => {
self.persistence_in_flight.store(0, Ordering::Release);
return Err(e);
},
}

self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
{
let outer_state_lock = self.per_peer_state.read().unwrap();

for (client_id, peer_state) in outer_state_lock.iter() {
let is_prunable = peer_state.is_prunable();
let has_open_channel = self.client_has_open_channel(client_id);
if is_prunable && !has_open_channel {
need_remove.push(*client_id);
} else if peer_state.needs_persist {
need_persist.push(*client_id);
}
}
if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
// If another thread incremented the state while we were running we should go
// around again, but only once.
self.persistence_in_flight.store(1, Ordering::Release);
continue;
}
break;
}

for client_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&client_id));
self.persist_peer_state(client_id).await?;
did_persist = true;
Ok(did_persist)
}

async fn do_persist(&self) -> Result<bool, lightning::io::Error> {
let mut did_persist = false;
let mut need_remove = Vec::new();
let mut need_persist = Vec::new();

self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
{
let outer_state_lock = self.per_peer_state.read().unwrap();

for (client_id, peer_state) in outer_state_lock.iter() {
let is_prunable = peer_state.is_prunable();
let has_open_channel = self.client_has_open_channel(client_id);
if is_prunable && !has_open_channel {
need_remove.push(*client_id);
} else if peer_state.needs_persist {
need_persist.push(*client_id);
}
}
}

for client_id in need_remove {
let mut future_opt = None;
{
// We need to take the `per_peer_state` write lock to remove an entry, but also
// have to hold it until after the `remove` call returns (but not through
// future completion) to ensure that writes for the peer's state are
// well-ordered with other `persist_peer_state` calls even across the removal
// itself.
let mut per_peer_state = self.per_peer_state.write().unwrap();
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
let state = entry.get_mut();
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
entry.remove();
let key = client_id.to_string();
future_opt = Some(self.kv_store.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
true,
));
} else {
// If the peer was re-added, force a re-persist of the current state.
state.needs_persist = true;
}
for client_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&client_id));
self.persist_peer_state(client_id).await?;
did_persist = true;
}

for client_id in need_remove {
let mut future_opt = None;
{
// We need to take the `per_peer_state` write lock to remove an entry, but also
// have to hold it until after the `remove` call returns (but not through
// future completion) to ensure that writes for the peer's state are
// well-ordered with other `persist_peer_state` calls even across the removal
// itself.
let mut per_peer_state = self.per_peer_state.write().unwrap();
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
let state = entry.get_mut();
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
entry.remove();
let key = client_id.to_string();
future_opt = Some(self.kv_store.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
true,
));
} else {
// This should never happen, we can only have one `persist` call
// in-progress at once and map entries are only removed by it.
debug_assert!(false);
// If the peer was re-added, force a re-persist of the current state.
state.needs_persist = true;
}
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(client_id).await?;
// This should never happen, we can only have one `persist` call
// in-progress at once and map entries are only removed by it.
debug_assert!(false);
}
}

if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
// If another thread incremented the state while we were running we should go
// around again, but only once.
self.persistence_in_flight.store(1, Ordering::Release);
continue;
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(client_id).await?;
}
break;
}

Ok(did_persist)
Expand Down
170 changes: 170 additions & 0 deletions lightning-liquidity/tests/lsps5_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,3 +1633,173 @@ fn lsps5_service_handler_persistence_across_restarts() {
}
}
}

struct FailableKVStore {
inner: TestStore,
fail_lsps5: std::sync::atomic::AtomicBool,
}

impl FailableKVStore {
fn new() -> Self {
Self { inner: TestStore::new(false), fail_lsps5: std::sync::atomic::AtomicBool::new(false) }
}

fn set_fail_lsps5(&self, fail: bool) {
self.fail_lsps5.store(fail, std::sync::atomic::Ordering::SeqCst);
}
}

impl lightning::util::persist::KVStoreSync for FailableKVStore {
fn read(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<Vec<u8>> {
<TestStore as lightning::util::persist::KVStoreSync>::read(
&self.inner,
primary_namespace,
secondary_namespace,
key,
)
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> lightning::io::Result<()> {
if secondary_namespace == "lsps5_service"
&& self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst)
{
return Err(lightning::io::Error::new(
lightning::io::ErrorKind::Other,
"intentional failure for lsps5 namespace",
));
}
<TestStore as lightning::util::persist::KVStoreSync>::write(
&self.inner,
primary_namespace,
secondary_namespace,
key,
buf,
)
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
) -> lightning::io::Result<()> {
if secondary_namespace == "lsps5_service"
&& self.fail_lsps5.load(std::sync::atomic::Ordering::SeqCst)
{
return Err(lightning::io::Error::new(
lightning::io::ErrorKind::Other,
"intentional failure for lsps5 namespace",
));
}
<TestStore as lightning::util::persist::KVStoreSync>::remove(
&self.inner,
primary_namespace,
secondary_namespace,
key,
lazy,
)
}

fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
) -> lightning::io::Result<Vec<String>> {
<TestStore as lightning::util::persist::KVStoreSync>::list(
&self.inner,
primary_namespace,
secondary_namespace,
)
}
}

#[test]
fn lsps5_service_persist_resets_in_flight_counter_on_io_error() {
use lightning::ln::peer_handler::CustomMessageHandler;

let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

let service_kv_store = Arc::new(FailableKVStore::new());
let client_kv_store = Arc::new(TestStore::new(false));

let service_config = LiquidityServiceConfig {
lsps1_service_config: None,
lsps2_service_config: None,
lsps5_service_config: Some(LSPS5ServiceConfig::default()),
advertise_service: true,
};
let client_config = LiquidityClientConfig {
lsps1_client_config: None,
lsps2_client_config: None,
lsps5_client_config: Some(LSPS5ClientConfig::default()),
};
let time_provider: Arc<dyn TimeProvider + Send + Sync> = Arc::new(DefaultTimeProvider);

let service_lm = LiquidityManagerSync::new_with_custom_time_provider(
nodes[0].keys_manager,
nodes[0].keys_manager,
nodes[0].node,
Arc::clone(&service_kv_store),
nodes[0].tx_broadcaster,
Some(service_config),
None,
Arc::clone(&time_provider),
)
.unwrap();

let client_lm = LiquidityManagerSync::new_with_custom_time_provider(
nodes[1].keys_manager,
nodes[1].keys_manager,
nodes[1].node,
client_kv_store,
nodes[1].tx_broadcaster,
None,
Some(client_config),
Arc::clone(&time_provider),
)
.unwrap();

let service_node_id = nodes[0].node.get_our_node_id();
let client_node_id = nodes[1].node.get_our_node_id();

create_chan_between_nodes(&nodes[0], &nodes[1]);

let client_handler = client_lm.lsps5_client_handler().unwrap();
client_handler
.set_webhook(service_node_id, "App".to_string(), "https://example.org/hook".to_string())
.unwrap();

let req_msgs = client_lm.get_and_clear_pending_msg();
assert_eq!(req_msgs.len(), 1);
let (_, request) = req_msgs.into_iter().next().unwrap();
service_lm.handle_custom_message(request, client_node_id).unwrap();

// Consume the SendWebhookNotification event so pending events queue is drained.
let _ = service_lm.next_event();
let _ = service_lm.get_and_clear_pending_msg();

// Initial persist should succeed and clear all needs_persist flags.
service_lm.persist().expect("initial persist should succeed");

// Now arrange for lsps5 writes to fail and dirty lsps5 state without dirtying
// pending_events (which lives in a different namespace).
service_kv_store.set_fail_lsps5(true);
service_lm.peer_disconnected(client_node_id);

// First persist attempt should error out due to the failing kv_store.
let res1 = service_lm.persist();
assert!(res1.is_err(), "persist should fail when lsps5 kv_store write fails");

// Second persist attempt must still attempt the write (and fail again). With the
// bug, the LSPS5 service handler's `persistence_in_flight` counter is left above
// zero on error so this returns Ok(false) immediately, silently dropping the
// pending state and breaking persistence forever.
let res2 = service_lm.persist();
assert!(
res2.is_err(),
"after a failed persist, subsequent persist calls must still attempt to persist; got {:?}",
res2,
);
}
Loading