Skip to content

Commit 4f10962

Browse files
authored
Add PgListener::next_buffered(), to support batch processing of notifications (#3560)
* Implement and test PgListener::try_recv_buffered(). * rustfmt * Fix warnings. * Fix test. * Rename try_recv_buffered() -> next_buffered().
1 parent 503a72c commit 4f10962

File tree

2 files changed

+84
-2
lines changed

2 files changed

+84
-2
lines changed

sqlx-postgres/src/listener.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ impl PgListener {
255255
pub async fn try_recv(&mut self) -> Result<Option<PgNotification>, Error> {
256256
// Flush the buffer first, if anything
257257
// This would only fill up if this listener is used as a connection
258-
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
259-
return Ok(Some(PgNotification(notification)));
258+
if let Some(notification) = self.next_buffered() {
259+
return Ok(Some(notification));
260260
}
261261

262262
// Fetch our `CloseEvent` listener, if applicable.
@@ -319,6 +319,19 @@ impl PgListener {
319319
}
320320
}
321321

322+
/// Receives the next notification that already exists in the connection buffer, if any.
323+
///
324+
/// This is similar to `try_recv`, except it will not wait if the connection has not yet received a notification.
325+
///
326+
/// This is helpful if you want to retrieve all buffered notifications and process them in batches.
327+
pub fn next_buffered(&mut self) -> Option<PgNotification> {
328+
if let Ok(Some(notification)) = self.buffer_rx.try_next() {
329+
Some(PgNotification(notification))
330+
} else {
331+
None
332+
}
333+
}
334+
322335
/// Consume this listener, returning a `Stream` of notifications.
323336
///
324337
/// The backing connection will be automatically reconnected should it be lost.

tests/postgres/postgres.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,75 @@ async fn test_listener_cleanup() -> anyhow::Result<()> {
10571057
Ok(())
10581058
}
10591059

1060+
#[sqlx_macros::test]
1061+
async fn test_listener_try_recv_buffered() -> anyhow::Result<()> {
1062+
use sqlx_core::rt::timeout;
1063+
1064+
use sqlx::pool::PoolOptions;
1065+
use sqlx::postgres::PgListener;
1066+
1067+
// Create a connection on which to send notifications
1068+
let mut notify_conn = new::<Postgres>().await?;
1069+
1070+
let pool = PoolOptions::<Postgres>::new()
1071+
.min_connections(1)
1072+
.max_connections(1)
1073+
.test_before_acquire(true)
1074+
.connect(&env::var("DATABASE_URL")?)
1075+
.await?;
1076+
1077+
let mut listener = PgListener::connect_with(&pool).await?;
1078+
listener.listen("test_channel2").await?;
1079+
1080+
// Checks for a notification on the test channel
1081+
async fn try_recv(listener: &mut PgListener) -> anyhow::Result<bool> {
1082+
match timeout(Duration::from_millis(100), listener.recv()).await {
1083+
Ok(res) => {
1084+
res?;
1085+
Ok(true)
1086+
}
1087+
Err(_) => Ok(false),
1088+
}
1089+
}
1090+
1091+
// Check no notification is buffered, since we haven't sent one.
1092+
assert!(listener.next_buffered().is_none());
1093+
1094+
// Send five notifications transactionally, so they all arrive at once.
1095+
{
1096+
let mut txn = notify_conn.begin().await?;
1097+
for i in 0..5 {
1098+
txn.execute(format!("NOTIFY test_channel2, 'payload {i}'").as_str())
1099+
.await?;
1100+
}
1101+
txn.commit().await?;
1102+
}
1103+
1104+
// Still no notifications buffered, since we haven't awaited the listener yet.
1105+
assert!(listener.next_buffered().is_none());
1106+
1107+
// Activate connection.
1108+
sqlx::query!("SELECT 1 AS one")
1109+
.fetch_all(&mut listener)
1110+
.await?;
1111+
1112+
// The next five notifications should now be buffered.
1113+
for i in 0..5 {
1114+
assert!(
1115+
listener.next_buffered().is_some(),
1116+
"Notification {i} was not buffered"
1117+
);
1118+
}
1119+
1120+
// Should be no more.
1121+
assert!(listener.next_buffered().is_none());
1122+
1123+
// Even if we wait.
1124+
assert!(!try_recv(&mut listener).await?, "Notification received");
1125+
1126+
Ok(())
1127+
}
1128+
10601129
#[sqlx_macros::test]
10611130
async fn test_pg_listener_allows_pool_to_close() -> anyhow::Result<()> {
10621131
let pool = pool::<Postgres>().await?;

0 commit comments

Comments
 (0)