Skip to content

Commit e33ba0d

Browse files
sanityclaude
andauthored
fix(fdev): wait for server response before closing WebSocket connection (#2280)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2a0e674 commit e33ba0d

File tree

4 files changed

+337
-57
lines changed

4 files changed

+337
-57
lines changed

crates/core/src/contract/handler.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,85 @@ pub mod test {
664664

665665
Ok(())
666666
}
667+
668+
/// Regression test for issue #2278: Verifies that send_to_sender returns
669+
/// an error when the response receiver is dropped, but does NOT crash or
670+
/// break the channel.
671+
///
672+
/// This tests that the channel infrastructure supports the fix in
673+
/// contract/mod.rs where we changed `send_to_sender()?` to non-propagating
674+
/// error handling, so the handler loop can continue even when a response
675+
/// can't be delivered.
676+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
677+
async fn send_to_sender_fails_gracefully_when_receiver_dropped() -> anyhow::Result<()> {
678+
let (send_halve, mut rcv_halve, _) = contract_handler_channel();
679+
680+
let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
681+
Arc::new(ContractCode::from(vec![0, 1, 2, 3])),
682+
Parameters::from(vec![4, 5]),
683+
)));
684+
let key = contract.key();
685+
686+
// Send a request
687+
let h = GlobalExecutor::spawn({
688+
async move {
689+
send_halve
690+
.send_to_handler(ContractHandlerEvent::PutQuery {
691+
key,
692+
state: vec![6, 7, 8].into(),
693+
related_contracts: RelatedContracts::default(),
694+
contract: None,
695+
})
696+
.await
697+
}
698+
});
699+
700+
// Receive the event from the handler side
701+
let (id, ev) =
702+
tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_sender())
703+
.await??;
704+
705+
// Verify it's a PutQuery
706+
let ContractHandlerEvent::PutQuery { state, .. } = ev else {
707+
anyhow::bail!("expected PutQuery event");
708+
};
709+
assert_eq!(state.as_ref(), &[6, 7, 8]);
710+
711+
// Abort the request handler task - this drops the oneshot receiver
712+
// simulating a client disconnect
713+
h.abort();
714+
// Wait a bit for the abort to take effect
715+
tokio::time::sleep(Duration::from_millis(10)).await;
716+
717+
// Try to send the response - this should fail because the receiver was dropped
718+
// when we aborted the task. In the actual contract_handling loop (after the fix),
719+
// this would just log and continue rather than propagating the error.
720+
let send_result = rcv_halve
721+
.send_to_sender(
722+
id,
723+
ContractHandlerEvent::PutResponse {
724+
new_value: Ok(vec![0, 7].into()),
725+
},
726+
)
727+
.await;
728+
729+
// send_to_sender should fail because receiver was dropped
730+
assert!(
731+
send_result.is_err(),
732+
"send_to_sender should fail when receiver is dropped, got {:?}",
733+
send_result
734+
);
735+
736+
// Verify the error is the expected NoEvHandlerResponse
737+
// Use super:: to disambiguate from freenet_stdlib::prelude::ContractError
738+
assert!(
739+
matches!(send_result, Err(super::ContractError::NoEvHandlerResponse)),
740+
"Expected NoEvHandlerResponse error, got {:?}",
741+
send_result
742+
);
743+
744+
Ok(())
745+
}
667746
}
668747

669748
pub(super) mod in_memory {

crates/core/src/contract/mod.rs

Lines changed: 67 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ where
5555
phase = "get_complete",
5656
"Fetched contract"
5757
);
58-
contract_handler
58+
// Send response back to caller. If the caller disconnected, just log and continue.
59+
if let Err(error) = contract_handler
5960
.channel()
6061
.send_to_sender(
6162
id,
@@ -65,13 +66,13 @@ where
6566
},
6667
)
6768
.await
68-
.map_err(|error| {
69-
tracing::debug!(
70-
error = %error,
71-
"Shutting down contract handler"
72-
);
73-
error
74-
})?;
69+
{
70+
tracing::debug!(
71+
error = %error,
72+
contract = %key,
73+
"Failed to send GET response (client may have disconnected)"
74+
);
75+
}
7576
}
7677
Err(err) => {
7778
tracing::warn!(
@@ -89,7 +90,8 @@ where
8990
);
9091
return Err(ContractError::FatalExecutorError { key, error: err });
9192
}
92-
contract_handler
93+
// Send error response back to caller. If the caller disconnected, just log and continue.
94+
if let Err(error) = contract_handler
9395
.channel()
9496
.send_to_sender(
9597
id,
@@ -99,13 +101,13 @@ where
99101
},
100102
)
101103
.await
102-
.map_err(|error| {
103-
tracing::debug!(
104-
error = %error,
105-
"Shutting down contract handler"
106-
);
107-
error
108-
})?;
104+
{
105+
tracing::debug!(
106+
error = %error,
107+
contract = %key,
108+
"Failed to send GET error response (client may have disconnected)"
109+
);
110+
}
109111
}
110112
}
111113
}
@@ -168,17 +170,20 @@ where
168170
}
169171
};
170172

171-
contract_handler
173+
// Send response back to caller. If the caller disconnected (e.g., WebSocket closed),
174+
// the response channel may be dropped. This is not fatal - the contract has already
175+
// been stored, so we just log and continue processing other events.
176+
if let Err(error) = contract_handler
172177
.channel()
173178
.send_to_sender(id, event_result)
174179
.await
175-
.map_err(|error| {
176-
tracing::debug!(
177-
error = %error,
178-
"Shutting down contract handler"
179-
);
180-
error
181-
})?;
180+
{
181+
tracing::debug!(
182+
error = %error,
183+
contract = %key,
184+
"Failed to send PUT response (client may have disconnected)"
185+
);
186+
}
182187
}
183188
ContractHandlerEvent::UpdateQuery {
184189
key,
@@ -258,17 +263,19 @@ where
258263
}
259264
};
260265

261-
contract_handler
266+
// Send response back to caller. If the caller disconnected, the response channel
267+
// may be dropped. This is not fatal - the update has already been applied.
268+
if let Err(error) = contract_handler
262269
.channel()
263270
.send_to_sender(id, event_result)
264271
.await
265-
.map_err(|error| {
266-
tracing::debug!(
267-
error = %error,
268-
"Shutting down contract handler"
269-
);
270-
error
271-
})?;
272+
{
273+
tracing::debug!(
274+
error = %error,
275+
contract = %key,
276+
"Failed to send UPDATE response (client may have disconnected)"
277+
);
278+
}
272279
}
273280
ContractHandlerEvent::DelegateRequest {
274281
req,
@@ -309,17 +316,19 @@ where
309316
}
310317
};
311318

312-
contract_handler
319+
// Send response back to caller. If the caller disconnected, the response channel
320+
// may be dropped. This is not fatal - the delegate has already been processed.
321+
if let Err(error) = contract_handler
313322
.channel()
314323
.send_to_sender(id, ContractHandlerEvent::DelegateResponse(response))
315324
.await
316-
.map_err(|error| {
317-
tracing::debug!(
318-
error = %error,
319-
"Shutting down contract handler"
320-
);
321-
error
322-
})?;
325+
{
326+
tracing::debug!(
327+
error = %error,
328+
delegate_key = %delegate_key,
329+
"Failed to send DELEGATE response (client may have disconnected)"
330+
);
331+
}
323332
}
324333
ContractHandlerEvent::RegisterSubscriberListener {
325334
key,
@@ -340,14 +349,19 @@ where
340349
);
341350
});
342351

343-
// FIXME: if there is an error senc actually an error back
344-
contract_handler
352+
// FIXME: if there is an error send actually an error back
353+
// If the caller disconnected, just log and continue.
354+
if let Err(error) = contract_handler
345355
.channel()
346356
.send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse)
347357
.await
348-
.inspect_err(|error| {
349-
tracing::debug!(%error, "shutting down contract handler");
350-
})?;
358+
{
359+
tracing::debug!(
360+
error = %error,
361+
contract = %key,
362+
"Failed to send RegisterSubscriberListener response (client may have disconnected)"
363+
);
364+
}
351365
}
352366
ContractHandlerEvent::QuerySubscriptions { callback } => {
353367
// Get subscription information from the executor and send it through the callback
@@ -362,13 +376,17 @@ where
362376
.send(crate::message::QueryResult::NetworkDebug(network_debug))
363377
.await;
364378

365-
contract_handler
379+
// If the caller disconnected, just log and continue.
380+
if let Err(error) = contract_handler
366381
.channel()
367382
.send_to_sender(id, ContractHandlerEvent::QuerySubscriptionsResponse)
368383
.await
369-
.inspect_err(|error| {
370-
tracing::debug!(%error, "shutting down contract handler");
371-
})?;
384+
{
385+
tracing::debug!(
386+
error = %error,
387+
"Failed to send QuerySubscriptions response (client may have disconnected)"
388+
);
389+
}
372390
}
373391
_ => unreachable!("ContractHandlerEvent enum should be exhaustive here"),
374392
}

crates/fdev/src/commands.rs

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use freenet_stdlib::prelude::{
55
ContractCode, ContractContainer, ContractWasmAPIVersion, Parameters, WrappedContract,
66
};
77
use freenet_stdlib::{
8-
client_api::{ClientRequest, ContractRequest, DelegateRequest, WebApi},
8+
client_api::{
9+
ClientRequest, ContractRequest, ContractResponse, DelegateRequest, HostResponse, WebApi,
10+
},
911
prelude::*,
1012
};
1113
use xz2::read::XzDecoder;
@@ -170,9 +172,26 @@ async fn put_contract(
170172
tracing::debug!("Starting WebSocket client connection");
171173
let mut client = start_api_client(other).await?;
172174
tracing::debug!("WebSocket client connected successfully");
173-
let result = execute_command(request, &mut client).await;
174-
tracing::debug!(success = ?result.is_ok(), "WebSocket client operation complete");
175-
result
175+
execute_command(request, &mut client).await?;
176+
177+
// Wait for server response before closing connection
178+
let response = client
179+
.recv()
180+
.await
181+
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;
182+
183+
match response {
184+
HostResponse::ContractResponse(ContractResponse::PutResponse { key: response_key }) => {
185+
tracing::info!(%response_key, "Contract published successfully");
186+
Ok(())
187+
}
188+
HostResponse::ContractResponse(other) => {
189+
anyhow::bail!("Unexpected contract response: {:?}", other)
190+
}
191+
other => {
192+
anyhow::bail!("Unexpected response type: {:?}", other)
193+
}
194+
}
176195
}
177196

178197
async fn put_delegate(
@@ -204,15 +223,32 @@ For additional hardening is recommended to use a different cipher and nonce to e
204223
(cipher, nonce)
205224
};
206225

207-
println!("Putting delegate {} ", delegate.key().encode());
226+
let delegate_key = delegate.key().clone();
227+
println!("Putting delegate {} ", delegate_key.encode());
208228
let request = DelegateRequest::RegisterDelegate {
209229
delegate,
210230
cipher,
211231
nonce,
212232
}
213233
.into();
214234
let mut client = start_api_client(other).await?;
215-
execute_command(request, &mut client).await
235+
execute_command(request, &mut client).await?;
236+
237+
// Wait for server response before closing connection
238+
let response = client
239+
.recv()
240+
.await
241+
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;
242+
243+
match response {
244+
HostResponse::DelegateResponse { key, values } => {
245+
tracing::info!(%key, response_count = values.len(), "Delegate registered successfully");
246+
Ok(())
247+
}
248+
other => {
249+
anyhow::bail!("Unexpected response type: {:?}", other)
250+
}
251+
}
216252
}
217253

218254
#[derive(clap::Parser, Clone, Debug)]
@@ -253,7 +289,7 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
253289
if config.release {
254290
anyhow::bail!("Cannot publish contracts in the network yet");
255291
}
256-
let key = ContractInstanceId::try_from(config.key)?.into();
292+
let key: ContractKey = ContractInstanceId::try_from(config.key)?.into();
257293
println!("Updating contract {key}");
258294
let data = {
259295
let mut buf = vec![];
@@ -262,7 +298,29 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
262298
};
263299
let request = ContractRequest::Update { key, data }.into();
264300
let mut client = start_api_client(other).await?;
265-
execute_command(request, &mut client).await
301+
execute_command(request, &mut client).await?;
302+
303+
// Wait for server response before closing connection
304+
let response = client
305+
.recv()
306+
.await
307+
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;
308+
309+
match response {
310+
HostResponse::ContractResponse(ContractResponse::UpdateResponse {
311+
key: response_key,
312+
summary,
313+
}) => {
314+
tracing::info!(%response_key, ?summary, "Contract updated successfully");
315+
Ok(())
316+
}
317+
HostResponse::ContractResponse(other) => {
318+
anyhow::bail!("Unexpected contract response: {:?}", other)
319+
}
320+
other => {
321+
anyhow::bail!("Unexpected response type: {:?}", other)
322+
}
323+
}
266324
}
267325

268326
pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result<WebApi> {

0 commit comments

Comments
 (0)