Skip to content

Commit efe3286

Browse files
authored
feat: notify application of state changes (#326)
* Make most state transitions go through apply_transition * Make check_end_of_resend go through apply_transition * Make graceful logout go through apply_transition * Remove logic to send logout message in AwaitingLogout state as it's no longer allowed * Notify application on state changes * Fix failing tests
1 parent 10f9f32 commit efe3286

File tree

8 files changed

+105
-48
lines changed

8 files changed

+105
-48
lines changed

crates/hotfix/src/application.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::message::OutboundMessage;
2+
use crate::session::Status;
23
use hotfix_message::message::Message;
34

45
#[async_trait::async_trait]
@@ -18,6 +19,11 @@ pub trait Application: Send + Sync + 'static {
1819
async fn on_logout(&mut self, reason: &str);
1920
/// Called when the session is logged on.
2021
async fn on_logon(&mut self);
22+
/// Called when the session state changes.
23+
///
24+
/// This is invoked after every state transition, providing the previous
25+
/// and new status. The default implementation does nothing.
26+
async fn on_state_change(&self, from: &Status, to: &Status);
2127
}
2228

2329
/// Standard FIX Business Reject Reason values (tag 380).

crates/hotfix/src/initiator.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ mod tests {
161161
use crate::message::logon::{Logon, ResetSeqNumConfig};
162162
use crate::message::logout::Logout;
163163
use crate::message::parser::Parser;
164+
use crate::session::Status;
164165
use crate::store::in_memory::InMemoryMessageStore;
165166
use hotfix_message::Part;
166167
use hotfix_message::message::Message;
@@ -195,6 +196,8 @@ mod tests {
195196
}
196197
async fn on_logout(&mut self, _reason: &str) {}
197198
async fn on_logon(&mut self) {}
199+
200+
async fn on_state_change(&self, _from: &Status, _to: &Status) {}
198201
}
199202

200203
/// A minimal FIX counterparty for testing the Initiator over TCP.

crates/hotfix/src/session.rs

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ where
223223
.handle_verification_issue(&mut self.ctx, &message, flags)
224224
.await?
225225
{
226-
self.apply_transition(result);
226+
self.apply_transition(result).await;
227227
return Ok(());
228228
}
229229

@@ -286,25 +286,28 @@ where
286286
}
287287

288288
async fn check_end_of_resend(&mut self) -> Result<(), SessionOperationError> {
289-
let ended_state = if let SessionState::AwaitingResend(state) = &mut self.state {
289+
let backlog = if let SessionState::AwaitingResend(state) = &mut self.state {
290290
if self.ctx.store.next_target_seq_number() > state.end_seq_number {
291+
let inbound_queue = std::mem::take(&mut state.inbound_queue);
291292
let new_state = SessionState::new_active(
292293
state.writer.clone(),
293294
self.ctx.config.heartbeat_interval,
294295
);
295-
Some(std::mem::replace(&mut self.state, new_state))
296+
self.apply_transition(TransitionResult::TransitionTo(new_state))
297+
.await;
298+
Some(inbound_queue)
296299
} else {
297300
None
298301
}
299302
} else {
300303
None
301304
};
302305

303-
if let Some(SessionState::AwaitingResend(mut state)) = ended_state {
306+
if let Some(mut inbound_queue) = backlog {
304307
// we have reached the end of the resend,
305308
// process queued messages and resume normal operation
306309
debug!("resend is done, processing backlog");
307-
while let Some(msg) = state.inbound_queue.pop_front() {
310+
while let Some(msg) = inbound_queue.pop_front() {
308311
let seq_number: u64 = msg.get(MSG_SEQ_NUM).unwrap_or_else(|e| {
309312
error!("failed to get seq number: {:?}", e);
310313
0
@@ -328,39 +331,48 @@ where
328331
}
329332

330333
async fn on_connect(&mut self, writer: WriterRef) -> Result<(), SessionOperationError> {
331-
self.state = SessionState::AwaitingLogon(AwaitingLogonState {
332-
writer,
333-
logon_sent: false,
334-
logon_timeout: Instant::now() + Duration::from_secs(self.ctx.config.logon_timeout),
335-
});
334+
self.apply_transition(TransitionResult::TransitionTo(SessionState::AwaitingLogon(
335+
AwaitingLogonState {
336+
writer,
337+
logon_sent: false,
338+
logon_timeout: Instant::now() + Duration::from_secs(self.ctx.config.logon_timeout),
339+
},
340+
)))
341+
.await;
336342
self.reset_peer_timer(None);
337343
self.send_logon().await?;
338344

339345
Ok(())
340346
}
341347

342348
async fn on_disconnect(&mut self, reason: String) {
343-
match self.state {
349+
let transition = match self.state {
344350
SessionState::Active(_)
345351
| SessionState::AwaitingLogon(_)
346352
| SessionState::AwaitingResend(_) => {
347353
self.state.disconnect_writer().await;
348-
self.state = SessionState::new_disconnected(true, &reason);
354+
TransitionResult::TransitionTo(SessionState::new_disconnected(true, &reason))
349355
}
350356
SessionState::Disconnected(_) => {
351-
warn!("disconnect message was received, but the session is already disconnected")
357+
warn!("disconnect message was received, but the session is already disconnected");
358+
TransitionResult::Stay
352359
}
353360
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
354-
self.state = SessionState::new_disconnected(reconnect, &reason);
361+
TransitionResult::TransitionTo(SessionState::new_disconnected(reconnect, &reason))
355362
}
356-
}
363+
};
364+
self.apply_transition(transition).await;
357365
}
358366

359367
async fn on_logon(&mut self) -> Result<(), SessionOperationError> {
360368
if let SessionState::AwaitingLogon(AwaitingLogonState { writer, .. }) = &self.state {
361369
let writer = writer.clone();
362370
// happy logon flow, the session is now active
363-
self.state = SessionState::new_active(writer, self.ctx.config.heartbeat_interval);
371+
self.apply_transition(TransitionResult::TransitionTo(SessionState::new_active(
372+
writer,
373+
self.ctx.config.heartbeat_interval,
374+
)))
375+
.await;
364376
self.ctx.application.on_logon().await;
365377
self.ctx.store.increment_target_seq_number().await?;
366378
} else {
@@ -388,12 +400,18 @@ where
388400
// if we initiated the logout, preserve the reconnect flag
389401
SessionState::AwaitingLogout(AwaitingLogoutState { reconnect, .. }) => {
390402
self.state.disconnect_writer().await;
391-
self.state = SessionState::new_disconnected(reconnect, "logout completed");
403+
self.apply_transition(TransitionResult::TransitionTo(
404+
SessionState::new_disconnected(reconnect, "logout completed"),
405+
))
406+
.await;
392407
}
393408
// otherwise assume it makes sense to try to reconnect
394409
_ => {
395410
self.state.disconnect_writer().await;
396-
self.state = SessionState::new_disconnected(true, "peer has logged us out")
411+
self.apply_transition(TransitionResult::TransitionTo(
412+
SessionState::new_disconnected(true, "peer has logged us out"),
413+
))
414+
.await;
397415
}
398416
}
399417

@@ -462,9 +480,17 @@ where
462480
Ok(())
463481
}
464482

465-
fn apply_transition(&mut self, result: TransitionResult) {
483+
async fn apply_transition(&mut self, result: TransitionResult) {
466484
if let TransitionResult::TransitionTo(new_state) = result {
485+
let old_status = self.state.as_status();
467486
self.state = new_state;
487+
let new_status = self.state.as_status();
488+
if old_status != new_status {
489+
self.ctx
490+
.application
491+
.on_state_change(&old_status, &new_status)
492+
.await;
493+
}
468494
}
469495
}
470496

@@ -532,7 +558,10 @@ where
532558
self.state
533559
.logout_and_terminate(&mut self.ctx, "internal error")
534560
.await;
535-
self.state = SessionState::new_disconnected(true, &reason);
561+
self.apply_transition(TransitionResult::TransitionTo(
562+
SessionState::new_disconnected(true, &reason),
563+
))
564+
.await;
536565
}
537566
}
538567
SessionEvent::Disconnected(reason) => {
@@ -575,12 +604,13 @@ where
575604
match request {
576605
AdminRequest::InitiateGracefulShutdown { reconnect } => {
577606
warn!("initiating shutdown on request from admin..");
578-
if let Err(err) = self
607+
match self
579608
.state
580609
.initiate_graceful_logout(&mut self.ctx, "explicitly requested", reconnect)
581610
.await
582611
{
583-
error!(err = ?err, "initiating graceful shutdown");
612+
Ok(result) => self.apply_transition(result).await,
613+
Err(err) => error!(err = ?err, "initiating graceful shutdown"),
584614
}
585615
}
586616
AdminRequest::RequestSessionInfo(responder) => {
@@ -646,8 +676,10 @@ where
646676
.await;
647677
if let Err(err) = self.ctx.store.reset().await {
648678
error!("error resetting session store: {err:}");
649-
self.state =
650-
SessionState::new_disconnected(false, "unexpected error in reset");
679+
self.apply_transition(TransitionResult::TransitionTo(
680+
SessionState::new_disconnected(false, "unexpected error in reset"),
681+
))
682+
.await;
651683
}
652684
}
653685
Ok(SessionPeriodComparison::OutsideSessionTime { .. }) => {
@@ -659,8 +691,10 @@ where
659691
.await;
660692
if let Err(err) = self.ctx.store.reset().await {
661693
error!("error resetting session store: {err:}");
662-
self.state =
663-
SessionState::new_disconnected(false, "unexpected error in reset");
694+
self.apply_transition(TransitionResult::TransitionTo(
695+
SessionState::new_disconnected(false, "unexpected error in reset"),
696+
))
697+
.await;
664698
}
665699
}
666700
Err(err) => {
@@ -673,12 +707,13 @@ where
673707
}
674708
} else if self.state.is_connected() {
675709
// we are currently outside scheduled session time
676-
if let Err(err) = self
710+
match self
677711
.state
678712
.initiate_graceful_logout(&mut self.ctx, "End of session time", true)
679713
.await
680714
{
681-
error!(err = ?err, "failed to initiate graceful logout");
715+
Ok(result) => self.apply_transition(result).await,
716+
Err(err) => error!(err = ?err, "failed to initiate graceful logout"),
682717
}
683718
}
684719

@@ -887,6 +922,8 @@ mod tests {
887922
}
888923
async fn on_logout(&mut self, _: &str) {}
889924
async fn on_logon(&mut self) {}
925+
926+
async fn on_state_change(&self, _from: &Status, _to: &Status) {}
890927
}
891928

892929
fn create_writer_ref() -> WriterRef {

crates/hotfix/src/session/state.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::message::OutboundMessage;
1515
use crate::message::logon::Logon;
1616
use crate::message::logout::Logout;
1717
use crate::message::verification::VerificationFlags;
18-
use crate::session::ctx::{SessionCtx, VerificationResult};
18+
use crate::session::ctx::{SessionCtx, TransitionResult, VerificationResult};
1919
use crate::session::error::{InternalSendError, InternalSendResultExt, SessionOperationError};
2020
use crate::session::event::ScheduleResponse;
2121
use crate::session::info::Status as SessionInfoStatus;
@@ -108,12 +108,8 @@ impl SessionState {
108108
}
109109
_ => error!("invalid outgoing message for AwaitingLogon state"),
110110
},
111-
Self::AwaitingLogout(AwaitingLogoutState { writer, .. }) => {
112-
// Logout messages are allowed because we first transition into AwaitingLogout
113-
// and only then send the logout message
114-
if message_type == Logout::MSG_TYPE {
115-
writer.send_raw_message(raw).await
116-
}
111+
Self::AwaitingLogout(_) => {
112+
error!("trying to send message while awaiting logout");
117113
}
118114
_ => error!("trying to write without an established connection"),
119115
}
@@ -144,25 +140,24 @@ impl SessionState {
144140
}
145141

146142
pub fn try_transition_to_awaiting_logout(
147-
&mut self,
143+
&self,
148144
logout_timeout: Duration,
149145
reconnect: bool,
150-
) -> bool {
146+
) -> TransitionResult {
151147
if matches!(self, SessionState::AwaitingLogout(_)) {
152148
debug!("already in awaiting logout state");
153-
return false;
149+
return TransitionResult::Stay;
154150
}
155151

156152
if let Some(writer) = self.get_writer() {
157-
*self = SessionState::AwaitingLogout(AwaitingLogoutState {
153+
TransitionResult::TransitionTo(SessionState::AwaitingLogout(AwaitingLogoutState {
158154
writer: writer.clone(),
159155
logout_timeout: Instant::now() + logout_timeout,
160156
reconnect,
161-
});
162-
true
157+
}))
163158
} else {
164159
error!("trying to transition to awaiting logout without an established connection");
165-
false
160+
TransitionResult::Stay
166161
}
167162
}
168163

@@ -202,19 +197,20 @@ impl SessionState {
202197
ctx: &mut SessionCtx<A, S>,
203198
reason: &str,
204199
reconnect: bool,
205-
) -> Result<(), SessionOperationError>
200+
) -> Result<TransitionResult, SessionOperationError>
206201
where
207202
A: Application,
208203
S: MessageStore,
209204
{
210-
if self.try_transition_to_awaiting_logout(
205+
let result = self.try_transition_to_awaiting_logout(
211206
Duration::from_secs(ctx.config.logout_timeout),
212207
reconnect,
213-
) {
208+
);
209+
if matches!(result, TransitionResult::TransitionTo(_)) {
214210
self.send_logout(ctx, reason).await?;
215211
}
216212

217-
Ok(())
213+
Ok(result)
218214
}
219215

220216
/// Sends a logout message and immediately disconnects the counterparty.

crates/hotfix/tests/connection_test_cases/helpers.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::sync::{Arc, Once};
1010
use hotfix::Application;
1111
use hotfix::application::{InboundDecision, OutboundDecision};
1212
use hotfix::message::OutboundMessage;
13+
use hotfix::session::Status;
1314
use hotfix_message::message::Message;
1415
use rcgen::{CertificateParams, DnType, IsCa, KeyPair, KeyUsagePurpose, SanType};
1516
use rustls::ServerConfig;
@@ -355,4 +356,6 @@ impl Application for MinimalApplication {
355356
async fn on_logout(&mut self, _reason: &str) {}
356357

357358
async fn on_logon(&mut self) {}
359+
360+
async fn on_state_change(&self, _from: &Status, _to: &Status) {}
358361
}

crates/hotfix/tests/session_test_cases/common/fakes/fake_application.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::common::test_messages::TestMessage;
22
use hotfix::Application;
33
use hotfix::application::{InboundDecision, OutboundDecision};
4+
use hotfix::session::Status;
45
use hotfix_message::message::Message;
56
use std::sync::Mutex;
67

@@ -75,4 +76,6 @@ impl Application for FakeApplication {
7576
async fn on_logout(&mut self, _reason: &str) {}
7677

7778
async fn on_logon(&mut self) {}
79+
80+
async fn on_state_change(&self, _from: &Status, _to: &Status) {}
7881
}

examples/load-testing/src/application.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::messages::{ExecutionReport, InboundMsg, OutboundMsg};
22
use hotfix::Application;
33
use hotfix::Message;
44
use hotfix::application::{InboundDecision, OutboundDecision};
5+
use hotfix::session::Status;
56
use tokio::sync::mpsc::UnboundedSender;
67
use tracing::info;
78

@@ -51,4 +52,8 @@ impl Application for LoadTestingApplication {
5152
async fn on_logon(&mut self) {
5253
info!("we've been logged in");
5354
}
55+
56+
async fn on_state_change(&self, from: &Status, to: &Status) {
57+
info!("we've changed from {:?} to {:?}", from, to);
58+
}
5459
}

0 commit comments

Comments
 (0)