From 6e5023535114f1064a941959fb1521702ccf4bcf Mon Sep 17 00:00:00 2001 From: Sim-hu Date: Sun, 8 Mar 2026 12:45:57 +0900 Subject: [PATCH] examples: add Server-Sent Events (SSE) example Add an example demonstrating how to implement Server-Sent Events using hyper's streaming response API. The example uses `StreamBody` with `futures_util::stream::unfold` to send timestamped JSON events to connected clients once per second. SSE is a common pattern for real-time web applications, and no existing example covers the `text/event-stream` content type or the SSE wire format. --- Cargo.toml | 5 ++ examples/server_sent_events.rs | 87 ++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 examples/server_sent_events.rs diff --git a/Cargo.toml b/Cargo.toml index f0169edff7..08f17e20f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -196,6 +196,11 @@ path = "examples/upgrades.rs" required-features = ["full"] +[[example]] +name = "server_sent_events" +path = "examples/server_sent_events.rs" +required-features = ["full"] + [[example]] name = "web_api" path = "examples/web_api.rs" diff --git a/examples/server_sent_events.rs b/examples/server_sent_events.rs new file mode 100644 index 0000000000..b58e898d96 --- /dev/null +++ b/examples/server_sent_events.rs @@ -0,0 +1,87 @@ +#![deny(warnings)] + +//! A Server-Sent Events (SSE) example. +//! +//! This server sends a stream of timestamped events to connected clients +//! using the `text/event-stream` content type. +//! +//! ```not_rust +//! cargo run --features="full" --example server_sent_events +//! ``` +//! +//! Then connect with: +//! +//! ```not_rust +//! curl -N http://127.0.0.1:3000/ +//! ``` + +use std::convert::Infallible; +use std::net::SocketAddr; +use std::time::Duration; + +use bytes::Bytes; +use http_body_util::{combinators::BoxBody, BodyExt, StreamBody}; +use hyper::body::Frame; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use tokio::net::TcpListener; + +#[path = "../benches/support/mod.rs"] +mod support; +use support::TokioIo; + +async fn sse( + _: Request, +) -> Result>, Infallible> { + // Build an infinite stream that yields one SSE frame per second. + let stream = futures_util::stream::unfold(0u64, |counter| async move { + tokio::time::sleep(Duration::from_secs(1)).await; + let counter = counter + 1; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + + // SSE format: each field ends with \n, events separated by \n\n + let event = format!( + "id: {counter}\nevent: tick\ndata: {{\"counter\":{counter},\"timestamp\":{now}}}\n\n" + ); + let frame: Result, Infallible> = Ok(Frame::data(Bytes::from(event))); + Some((frame, counter)) + }); + + let body = StreamBody::new(stream).boxed(); + + let response = Response::builder() + .header("content-type", "text/event-stream") + .header("cache-control", "no-cache") + .body(body) + .unwrap(); + + Ok(response) +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + pretty_env_logger::init(); + + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = TcpListener::bind(addr).await?; + println!("Listening on http://{addr}"); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection(io, service_fn(sse)) + .await + { + eprintln!("Error serving connection: {err:?}"); + } + }); + } +}