Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

## Unreleased

### Breaking changes

* `Append` has no more `exit` method. Users should compose `logforth::core::default_logger().flush()` with their own graceful shutdown logic.
* `Async` appender's `flush` method is now blocking until all buffered logs are flushed by worker threads. Any errors during flushing will be propagated back to the `flush` caller.

## [0.29.1] 2025-11-03

### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ insta = { version = "1.43.2" }
jiff = { version = "0.2" }
libc = { version = "0.2.162" }
log = { version = "0.4.27", features = ["kv_std", "kv_sval"] }
oneshot = { version = "0.1.11", default-features = false, features = ["std"] }
opentelemetry = { version = "0.31.0", default-features = false }
opentelemetry-otlp = { version = "0.31.0", default-features = false }
opentelemetry_sdk = { version = "0.31.0", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions appenders/async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rustdoc-args = ["--cfg", "docsrs"]
arc-swap = { workspace = true }
crossbeam-channel = { workspace = true }
logforth-core = { workspace = true }
oneshot = { workspace = true }

[lints]
workspace = true
55 changes: 14 additions & 41 deletions appenders/async/src/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
Expand All @@ -31,10 +29,7 @@ use crate::worker::Worker;
/// A composable appender, logging and flushing asynchronously.
#[derive(Debug)]
pub struct Async {
appends: Arc<[Box<dyn Append>]>,
overflow: Overflow,
state: AsyncState,
trap: Arc<dyn Trap>,
}

impl Append for Async {
Expand All @@ -46,38 +41,24 @@ impl Append for Async {
d.visit(&mut collector)?;
}

let overflow = self.overflow;
let task = Task::Log {
appends: self.appends.clone(),
record: Box::new(record.to_owned()),
diags: diagnostics,
};
self.state.send_task(task, overflow)
self.state.send_task(task)
}

fn flush(&self) -> Result<(), Error> {
let overflow = self.overflow;
let task = Task::Flush {
appends: self.appends.clone(),
};
self.state.send_task(task, overflow)
}
let (done_tx, done_rx) = oneshot::channel();

let task = Task::Flush { done: done_tx };
self.state.send_task(task)?;

fn exit(&self) -> Result<(), Error> {
// If the program is tearing down, this will be the final flush. `crossbeam`
// uses thread-local internally, which is not supported in `atexit` callback.
// This can be bypassed by flushing sinks directly on the current thread, but
// before we do that we have to join the thread to ensure that any pending log
// tasks are completed.
//
// @see https://github.com/SpriteOvO/spdlog-rs/issues/64
self.state.destroy();
for append in self.appends.iter() {
if let Err(err) = append.exit() {
self.trap.trap(&err);
}
match done_rx.recv() {
Ok(None) => Ok(()),
Ok(Some(err)) => Err(err),
Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)),
}
Ok(())
}
}

Expand All @@ -86,7 +67,7 @@ pub struct AsyncBuilder {
thread_name: String,
appends: Vec<Box<dyn Append>>,
buffered_lines_limit: Option<usize>,
trap: Arc<dyn Trap>,
trap: Box<dyn Trap>,
overflow: Overflow,
}

Expand All @@ -97,7 +78,7 @@ impl AsyncBuilder {
thread_name: thread_name.into(),
appends: vec![],
buffered_lines_limit: None,
trap: Arc::new(BestEffortTrap::default()),
trap: Box::new(BestEffortTrap::default()),
overflow: Overflow::Block,
}
}
Expand All @@ -122,7 +103,6 @@ impl AsyncBuilder {

/// Set the trap for this async appender.
pub fn trap(mut self, trap: impl Into<Box<dyn Trap>>) -> Self {
let trap = trap.into();
self.trap = trap.into();
self
}
Expand All @@ -143,26 +123,19 @@ impl AsyncBuilder {
overflow,
} = self;

let appends = appends.into_boxed_slice().into();

let (sender, receiver) = match buffered_lines_limit {
Some(limit) => crossbeam_channel::bounded(limit),
None => crossbeam_channel::unbounded(),
};

let worker = Worker::new(receiver, trap.clone());
let worker = Worker::new(appends, receiver, trap);
let thread_handle = std::thread::Builder::new()
.name(thread_name)
.spawn(move || worker.run())
.expect("failed to spawn async appender thread");
let state = AsyncState::new(sender, thread_handle);

Async {
appends,
overflow,
state,
trap,
}
let state = AsyncState::new(overflow, sender, thread_handle);
Async { state }
}
}

Expand Down
7 changes: 2 additions & 5 deletions appenders/async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

#![cfg_attr(docsrs, feature(doc_cfg))]

use std::sync::Arc;

use logforth_core::Append;
use logforth_core::Error;
use logforth_core::kv;
use logforth_core::record::RecordOwned;

Expand All @@ -31,12 +29,11 @@ pub use self::append::AsyncBuilder;

enum Task {
Log {
appends: Arc<[Box<dyn Append>]>,
record: Box<RecordOwned>,
diags: Vec<(kv::KeyOwned, kv::ValueOwned)>,
},
Flush {
appends: Arc<[Box<dyn Append>]>,
done: oneshot::Sender<Option<Error>>,
},
}

Expand Down
20 changes: 14 additions & 6 deletions appenders/async/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@ pub(crate) struct AsyncState(ArcSwapOption<State>);

#[derive(Debug)]
struct State {
overflow: Overflow,
sender: Sender<Task>,
handle: JoinHandle<()>,
}

impl AsyncState {
pub(crate) fn new(sender: Sender<Task>, handle: JoinHandle<()>) -> Self {
let state = State { sender, handle };
Self(ArcSwapOption::from(Some(Arc::new(state))))
pub(crate) fn new(overflow: Overflow, sender: Sender<Task>, handle: JoinHandle<()>) -> Self {
Self(ArcSwapOption::from(Some(Arc::new(State {
overflow,
sender,
handle,
}))))
}

pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> {
pub(crate) fn send_task(&self, task: Task) -> Result<(), Error> {
let state = self.0.load();
// SAFETY: state is always Some before dropped.
let state = state.as_ref().unwrap();
let sender = &state.sender;

match overflow {
match state.overflow {
Overflow::Block => sender.send(task).map_err(|err| {
Error::new(match err.0 {
Task::Log { .. } => "failed to send log task to async appender",
Expand All @@ -66,7 +70,11 @@ impl AsyncState {
pub(crate) fn destroy(&self) {
if let Some(state) = self.0.swap(None) {
// SAFETY: state has always one strong count before swapped.
let State { sender, handle } = Arc::into_inner(state).unwrap();
let State {
overflow: _,
sender,
handle,
} = Arc::into_inner(state).unwrap();

// drop our sender, threads will break the loop after receiving and processing
drop(sender);
Expand Down
43 changes: 28 additions & 15 deletions appenders/async/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use crossbeam_channel::Receiver;
use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
use logforth_core::Trap;
Expand All @@ -24,25 +23,34 @@ use logforth_core::kv::Visitor;
use crate::Task;

pub(crate) struct Worker {
appends: Vec<Box<dyn Append>>,
receiver: Receiver<Task>,
trap: Arc<dyn Trap>,
trap: Box<dyn Trap>,
}

impl Worker {
pub(crate) fn new(receiver: Receiver<Task>, trap: Arc<dyn Trap>) -> Self {
Self { receiver, trap }
pub(crate) fn new(
appends: Vec<Box<dyn Append>>,
receiver: Receiver<Task>,
trap: Box<dyn Trap>,
) -> Self {
Self {
appends,
receiver,
trap,
}
}

pub(crate) fn run(self) {
let Self { receiver, trap } = self;
let Self {
appends,
receiver,
trap,
} = self;

while let Ok(task) = receiver.recv() {
match task {
Task::Log {
appends,
record,
diags,
} => {
Task::Log { record, diags } => {
let diags: &[Box<dyn Diagnostic>] = if diags.is_empty() {
&[]
} else {
Expand All @@ -51,18 +59,23 @@ impl Worker {
let record = record.as_record();
for append in appends.iter() {
if let Err(err) = append.append(&record, diags) {
let err = Error::new("failed to append record").set_source(err);
let err = Error::new("failed to append record").with_source(err);
trap.trap(&err);
}
}
}
Task::Flush { appends } => {
Task::Flush { done } => {
let mut error = None;
for append in appends.iter() {
if let Err(err) = append.flush() {
let err = Error::new("failed to flush").set_source(err);
trap.trap(&err);
error = Some(
error
.unwrap_or_else(|| Error::new("failed to flush appender"))
.with_source(err),
);
}
}
let _ = done.send(error);
}
}
}
Expand Down
Loading