From ff1279211843cbfc7d49127b02b505e20bc3c588 Mon Sep 17 00:00:00 2001 From: tison Date: Sun, 4 Jan 2026 12:15:20 +0800 Subject: [PATCH] refactor: rework logger API Signed-off-by: tison Co-authored-by: TennyZhuang Signed-off-by: tison --- CHANGELOG.md | 5 ++ Cargo.toml | 1 + appenders/async/Cargo.toml | 1 + appenders/async/src/append.rs | 55 ++++-------- appenders/async/src/lib.rs | 7 +- appenders/async/src/state.rs | 20 +++-- appenders/async/src/worker.rs | 43 ++++++---- appenders/async/tests/flushes.rs | 107 ++++++++++++++++++++++++ appenders/fastrace/src/lib.rs | 6 -- appenders/file/src/rolling.rs | 22 ++--- appenders/journald/src/lib.rs | 2 +- appenders/opentelemetry/src/lib.rs | 2 +- core/src/append/mod.rs | 11 --- core/src/error.rs | 50 ++++++----- core/src/logger/log_impl.rs | 84 +------------------ logforth/Cargo.toml | 5 -- logforth/examples/asynchronous.rs | 3 + logforth/tests/global_async_sink.rs | 125 ---------------------------- 18 files changed, 218 insertions(+), 331 deletions(-) create mode 100644 appenders/async/tests/flushes.rs delete mode 100644 logforth/tests/global_async_sink.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f03075..10ffdd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. ## Unreleased +### Breaking changes + +* `Append` has no more `exit` method. Users should compose `logforth::core::default_logger().flush()` with their own graceful shutdown logic. +* `Async` appender's `flush` method is now blocking until all buffered logs are flushed by worker threads. Any errors during flushing will be propagated back to the `flush` caller. + ## [0.29.1] 2025-11-03 ### Bug fixes diff --git a/Cargo.toml b/Cargo.toml index c160d77..80f7bfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ insta = { version = "1.43.2" } jiff = { version = "0.2" } libc = { version = "0.2.162" } log = { version = "0.4.27", features = ["kv_std", "kv_sval"] } +oneshot = { version = "0.1.11", default-features = false, features = ["std"] } opentelemetry = { version = "0.31.0", default-features = false } opentelemetry-otlp = { version = "0.31.0", default-features = false } opentelemetry_sdk = { version = "0.31.0", default-features = false } diff --git a/appenders/async/Cargo.toml b/appenders/async/Cargo.toml index 2f39ae7..cc298f3 100644 --- a/appenders/async/Cargo.toml +++ b/appenders/async/Cargo.toml @@ -35,6 +35,7 @@ rustdoc-args = ["--cfg", "docsrs"] arc-swap = { workspace = true } crossbeam-channel = { workspace = true } logforth-core = { workspace = true } +oneshot = { workspace = true } [lints] workspace = true diff --git a/appenders/async/src/append.rs b/appenders/async/src/append.rs index 9022327..60aed83 100644 --- a/appenders/async/src/append.rs +++ b/appenders/async/src/append.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use logforth_core::Append; use logforth_core::Diagnostic; use logforth_core::Error; @@ -31,10 +29,7 @@ use crate::worker::Worker; /// A composable appender, logging and flushing asynchronously. #[derive(Debug)] pub struct Async { - appends: Arc<[Box]>, - overflow: Overflow, state: AsyncState, - trap: Arc, } impl Append for Async { @@ -46,38 +41,24 @@ impl Append for Async { d.visit(&mut collector)?; } - let overflow = self.overflow; let task = Task::Log { - appends: self.appends.clone(), record: Box::new(record.to_owned()), diags: diagnostics, }; - self.state.send_task(task, overflow) + self.state.send_task(task) } fn flush(&self) -> Result<(), Error> { - let overflow = self.overflow; - let task = Task::Flush { - appends: self.appends.clone(), - }; - self.state.send_task(task, overflow) - } + let (done_tx, done_rx) = oneshot::channel(); + + let task = Task::Flush { done: done_tx }; + self.state.send_task(task)?; - fn exit(&self) -> Result<(), Error> { - // If the program is tearing down, this will be the final flush. `crossbeam` - // uses thread-local internally, which is not supported in `atexit` callback. - // This can be bypassed by flushing sinks directly on the current thread, but - // before we do that we have to join the thread to ensure that any pending log - // tasks are completed. - // - // @see https://github.com/SpriteOvO/spdlog-rs/issues/64 - self.state.destroy(); - for append in self.appends.iter() { - if let Err(err) = append.exit() { - self.trap.trap(&err); - } + match done_rx.recv() { + Ok(None) => Ok(()), + Ok(Some(err)) => Err(err), + Err(err) => Err(Error::new("worker exited before completing flush").with_source(err)), } - Ok(()) } } @@ -86,7 +67,7 @@ pub struct AsyncBuilder { thread_name: String, appends: Vec>, buffered_lines_limit: Option, - trap: Arc, + trap: Box, overflow: Overflow, } @@ -97,7 +78,7 @@ impl AsyncBuilder { thread_name: thread_name.into(), appends: vec![], buffered_lines_limit: None, - trap: Arc::new(BestEffortTrap::default()), + trap: Box::new(BestEffortTrap::default()), overflow: Overflow::Block, } } @@ -122,7 +103,6 @@ impl AsyncBuilder { /// Set the trap for this async appender. pub fn trap(mut self, trap: impl Into>) -> Self { - let trap = trap.into(); self.trap = trap.into(); self } @@ -143,26 +123,19 @@ impl AsyncBuilder { overflow, } = self; - let appends = appends.into_boxed_slice().into(); - let (sender, receiver) = match buffered_lines_limit { Some(limit) => crossbeam_channel::bounded(limit), None => crossbeam_channel::unbounded(), }; - let worker = Worker::new(receiver, trap.clone()); + let worker = Worker::new(appends, receiver, trap); let thread_handle = std::thread::Builder::new() .name(thread_name) .spawn(move || worker.run()) .expect("failed to spawn async appender thread"); - let state = AsyncState::new(sender, thread_handle); - Async { - appends, - overflow, - state, - trap, - } + let state = AsyncState::new(overflow, sender, thread_handle); + Async { state } } } diff --git a/appenders/async/src/lib.rs b/appenders/async/src/lib.rs index bbe46f8..edf945e 100644 --- a/appenders/async/src/lib.rs +++ b/appenders/async/src/lib.rs @@ -16,9 +16,7 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -use std::sync::Arc; - -use logforth_core::Append; +use logforth_core::Error; use logforth_core::kv; use logforth_core::record::RecordOwned; @@ -31,12 +29,11 @@ pub use self::append::AsyncBuilder; enum Task { Log { - appends: Arc<[Box]>, record: Box, diags: Vec<(kv::KeyOwned, kv::ValueOwned)>, }, Flush { - appends: Arc<[Box]>, + done: oneshot::Sender>, }, } diff --git a/appenders/async/src/state.rs b/appenders/async/src/state.rs index 7de02bf..3e86af8 100644 --- a/appenders/async/src/state.rs +++ b/appenders/async/src/state.rs @@ -27,23 +27,27 @@ pub(crate) struct AsyncState(ArcSwapOption); #[derive(Debug)] struct State { + overflow: Overflow, sender: Sender, handle: JoinHandle<()>, } impl AsyncState { - pub(crate) fn new(sender: Sender, handle: JoinHandle<()>) -> Self { - let state = State { sender, handle }; - Self(ArcSwapOption::from(Some(Arc::new(state)))) + pub(crate) fn new(overflow: Overflow, sender: Sender, handle: JoinHandle<()>) -> Self { + Self(ArcSwapOption::from(Some(Arc::new(State { + overflow, + sender, + handle, + })))) } - pub(crate) fn send_task(&self, task: Task, overflow: Overflow) -> Result<(), Error> { + pub(crate) fn send_task(&self, task: Task) -> Result<(), Error> { let state = self.0.load(); // SAFETY: state is always Some before dropped. let state = state.as_ref().unwrap(); let sender = &state.sender; - match overflow { + match state.overflow { Overflow::Block => sender.send(task).map_err(|err| { Error::new(match err.0 { Task::Log { .. } => "failed to send log task to async appender", @@ -66,7 +70,11 @@ impl AsyncState { pub(crate) fn destroy(&self) { if let Some(state) = self.0.swap(None) { // SAFETY: state has always one strong count before swapped. - let State { sender, handle } = Arc::into_inner(state).unwrap(); + let State { + overflow: _, + sender, + handle, + } = Arc::into_inner(state).unwrap(); // drop our sender, threads will break the loop after receiving and processing drop(sender); diff --git a/appenders/async/src/worker.rs b/appenders/async/src/worker.rs index a3f2f34..294695e 100644 --- a/appenders/async/src/worker.rs +++ b/appenders/async/src/worker.rs @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use crossbeam_channel::Receiver; +use logforth_core::Append; use logforth_core::Diagnostic; use logforth_core::Error; use logforth_core::Trap; @@ -24,25 +23,34 @@ use logforth_core::kv::Visitor; use crate::Task; pub(crate) struct Worker { + appends: Vec>, receiver: Receiver, - trap: Arc, + trap: Box, } impl Worker { - pub(crate) fn new(receiver: Receiver, trap: Arc) -> Self { - Self { receiver, trap } + pub(crate) fn new( + appends: Vec>, + receiver: Receiver, + trap: Box, + ) -> Self { + Self { + appends, + receiver, + trap, + } } pub(crate) fn run(self) { - let Self { receiver, trap } = self; + let Self { + appends, + receiver, + trap, + } = self; while let Ok(task) = receiver.recv() { match task { - Task::Log { - appends, - record, - diags, - } => { + Task::Log { record, diags } => { let diags: &[Box] = if diags.is_empty() { &[] } else { @@ -51,18 +59,23 @@ impl Worker { let record = record.as_record(); for append in appends.iter() { if let Err(err) = append.append(&record, diags) { - let err = Error::new("failed to append record").set_source(err); + let err = Error::new("failed to append record").with_source(err); trap.trap(&err); } } } - Task::Flush { appends } => { + Task::Flush { done } => { + let mut error = None; for append in appends.iter() { if let Err(err) = append.flush() { - let err = Error::new("failed to flush").set_source(err); - trap.trap(&err); + error = Some( + error + .unwrap_or_else(|| Error::new("failed to flush appender")) + .with_source(err), + ); } } + let _ = done.send(error); } } } diff --git a/appenders/async/tests/flushes.rs b/appenders/async/tests/flushes.rs new file mode 100644 index 0000000..3b4ffe2 --- /dev/null +++ b/appenders/async/tests/flushes.rs @@ -0,0 +1,107 @@ +// Copyright 2024 FastLabs Developers +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for asynchronous appenders wait on flushes. + +use std::sync::Arc; +use std::sync::Barrier; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; + +use logforth_append_async::AsyncBuilder; +use logforth_core::Append; +use logforth_core::Diagnostic; +use logforth_core::Error; +use logforth_core::Trap; +use logforth_core::record::Record; + +#[derive(Debug)] +struct BarrierAppend { + started: Arc, + barrier: Arc, +} + +impl Append for BarrierAppend { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + self.started.store(true, Ordering::SeqCst); + self.barrier.wait(); + Ok(()) + } +} + +#[derive(Debug)] +struct FailingFlush; + +impl Append for FailingFlush { + fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { + Ok(()) + } + + fn flush(&self) -> Result<(), Error> { + Err(Error::new("flush failed")) + } +} + +#[derive(Debug)] +struct NoopTrap; + +impl Trap for NoopTrap { + fn trap(&self, _: &Error) {} +} + +#[test] +fn flush_waits_for_worker_completion() { + let started = Arc::new(AtomicBool::new(false)); + let barrier = Arc::new(Barrier::new(2)); + + let append = BarrierAppend { + started: started.clone(), + barrier: barrier.clone(), + }; + + let async_append = AsyncBuilder::new("async-flush-wait").append(append).build(); + + let barrier_for_main = barrier.clone(); + let flush_handle = std::thread::spawn(move || async_append.flush()); + + while !started.load(Ordering::SeqCst) { + std::thread::yield_now(); + } + + assert!(!flush_handle.is_finished()); + + barrier_for_main.wait(); + + flush_handle + .join() + .expect("flush thread panicked") + .expect("flush should succeed"); +} + +#[test] +fn flush_propagates_errors() { + let async_append = AsyncBuilder::new("async-flush-error") + .trap(NoopTrap) + .append(FailingFlush) + .build(); + + let err = async_append.flush().unwrap_err(); + let err = err.to_string(); + assert!(err.contains("failed to flush")); + assert!(err.contains("flush failed")); +} diff --git a/appenders/fastrace/src/lib.rs b/appenders/fastrace/src/lib.rs index e9f44d0..307082c 100644 --- a/appenders/fastrace/src/lib.rs +++ b/appenders/fastrace/src/lib.rs @@ -83,12 +83,6 @@ impl Append for FastraceEvent { fastrace::flush(); Ok(()) } - - fn exit(&self) -> Result<(), Error> { - // do nothing - because fastrace::flush uses thread-local storage internally, - // which is not supported in atexit callbacks. - Ok(()) - } } struct KvCollector { diff --git a/appenders/file/src/rolling.rs b/appenders/file/src/rolling.rs index cea0a1e..82035da 100644 --- a/appenders/file/src/rolling.rs +++ b/appenders/file/src/rolling.rs @@ -42,7 +42,7 @@ pub struct RollingFileWriter { impl Drop for RollingFileWriter { fn drop(&mut self) { if let Err(err) = self.writer.flush() { - let err = Error::new("failed to flush file writer on dropped").set_source(err); + let err = Error::new("failed to flush file writer on dropped").with_source(err); self.state.trap.trap(&err); } } @@ -237,7 +237,7 @@ impl State { let now = clock.now(); let log_dir = dir.as_ref().to_path_buf(); fs::create_dir_all(&log_dir) - .map_err(|err| Error::new("failed to create log directory").set_source(err))?; + .map_err(|err| Error::new("failed to create log directory").with_source(err))?; let mut state = State { log_dir, @@ -284,7 +284,7 @@ impl State { OpenOptions::new() .append(true) .open(&filename) - .map_err(|err| Error::new("failed to open current log").set_source(err))? + .map_err(|err| Error::new("failed to open current log").with_source(err))? } } }; @@ -306,7 +306,7 @@ impl State { .write(true) .create_new(true) .open(&filename) - .map_err(|err| Error::new("failed to create log file").set_source(err)) + .map_err(|err| Error::new("failed to create log file").with_source(err)) } fn join_date(&self, date: &Zoned, cnt: usize) -> PathBuf { @@ -332,7 +332,7 @@ impl State { "failed to read log dir: {}", self.log_dir.display() )) - .set_source(err) + .with_source(err) })?; let files = read_dir @@ -413,7 +413,7 @@ impl State { let filepath = &file.filepath; fs::remove_file(filepath).map_err(|err| { Error::new(format!("failed to remove old log: {}", filepath.display())) - .set_source(err) + .with_source(err) })?; } @@ -434,7 +434,7 @@ impl State { for (old, new) in renames.iter().rev() { fs::rename(old, new).map_err(|err| { - Error::new(format!("failed to rotate log: {}", old.display())).set_source(err) + Error::new(format!("failed to rotate log: {}", old.display())).with_source(err) })? } @@ -445,12 +445,12 @@ impl State { "failed to archive log: {}", current_filepath.display() )) - .set_source(err) + .with_source(err) })?; if let Some(max_files) = self.max_files { if let Err(err) = self.delete_oldest_logs(max_files.get()) { - let err = Error::new("failed to delete oldest logs").set_source(err); + let err = Error::new("failed to delete oldest logs").with_source(err); self.trap.trap(&err); } } @@ -462,13 +462,13 @@ impl State { match self.rotate_log_writer(now) { Ok(new_file) => { if let Err(err) = file.flush() { - let err = Error::new("failed to flush previous writer").set_source(err); + let err = Error::new("failed to flush previous writer").with_source(err); self.trap.trap(&err); } *file = new_file; } Err(err) => { - let err = Error::new("failed to rotate log writer").set_source(err); + let err = Error::new("failed to rotate log writer").with_source(err); self.trap.trap(&err); } } diff --git a/appenders/journald/src/lib.rs b/appenders/journald/src/lib.rs index 5d4ad24..fad3c4b 100644 --- a/appenders/journald/src/lib.rs +++ b/appenders/journald/src/lib.rs @@ -229,7 +229,7 @@ impl Journald { #[cfg(target_os = "linux")] fn send_large_payload(&self, payload: &[u8]) -> Result { memfd::send_large_payload(&self.socket, payload) - .map_err(|err| Error::new("failed to send payload via memfd").set_source(err)) + .map_err(|err| Error::new("failed to send payload via memfd").with_source(err)) } } diff --git a/appenders/opentelemetry/src/lib.rs b/appenders/opentelemetry/src/lib.rs index 4a11ebe..bb92350 100644 --- a/appenders/opentelemetry/src/lib.rs +++ b/appenders/opentelemetry/src/lib.rs @@ -282,7 +282,7 @@ impl Append for OpentelemetryLog { fn flush(&self) -> Result<(), Error> { self.provider .force_flush() - .map_err(|err| Error::new("failed to flush records").set_source(err)) + .map_err(|err| Error::new("failed to flush records").with_source(err)) } } diff --git a/core/src/append/mod.rs b/core/src/append/mod.rs index ac6e796..573e80a 100644 --- a/core/src/append/mod.rs +++ b/core/src/append/mod.rs @@ -34,17 +34,6 @@ pub trait Append: fmt::Debug + Send + Sync + 'static { /// Flush any buffered records. fn flush(&self) -> Result<(), Error>; - - /// Perform any cleanup work before the program exits. - /// - /// Default to call `flush`. - /// - /// This is typically called within a global logger during program exits. - /// If it is not in a global logger, the drop glue should perform necessary - /// cleanup. - fn exit(&self) -> Result<(), Error> { - self.flush() - } } impl From for Box { diff --git a/core/src/error.rs b/core/src/error.rs index c15aa9f..31ac0c1 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -18,7 +18,7 @@ use std::io; /// The error struct of logforth. pub struct Error { message: String, - source: Option, + sources: Vec, context: Vec<(&'static str, String)>, } @@ -40,8 +40,15 @@ impl fmt::Display for Error { write!(f, " }}")?; } - if let Some(source) = &self.source { - write!(f, ", source: {source}")?; + if !self.sources.is_empty() { + write!(f, ", sources: [")?; + for (i, source) in self.sources.iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + write!(f, "{source}")?; + } + write!(f, "]")?; } Ok(()) @@ -55,7 +62,7 @@ impl fmt::Debug for Error { let mut de = f.debug_struct("Error"); de.field("message", &self.message); de.field("context", &self.context); - de.field("source", &self.source); + de.field("sources", &self.sources); return de.finish(); } @@ -69,10 +76,12 @@ impl fmt::Debug for Error { writeln!(f, " {k}: {v}")?; } } - if let Some(source) = &self.source { + if !self.sources.is_empty() { writeln!(f)?; - writeln!(f, "Source:")?; - writeln!(f, " {source:#}")?; + writeln!(f, "Sources:")?; + for source in self.sources.iter() { + writeln!(f, " {source:#}")?; + } } Ok(()) @@ -81,7 +90,7 @@ impl fmt::Debug for Error { impl std::error::Error for Error { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.source.as_ref().map(|v| v.as_ref()) + self.sources.first().map(|v| v.as_ref()) } } @@ -90,36 +99,35 @@ impl Error { pub fn new(message: impl Into) -> Self { Self { message: message.into(), - source: None, + sources: vec![], context: vec![], } } - /// Add more context in error. + /// Add one more context in error. pub fn with_context(mut self, key: &'static str, value: impl ToString) -> Self { self.context.push((key, value.to_string())); self } - /// Set source for error. - /// - /// # Notes - /// - /// If the source has been set, we will raise a panic here. - pub fn set_source(mut self, src: impl Into) -> Self { - debug_assert!(self.source.is_none(), "the source error has been set"); - - self.source = Some(src.into()); + /// Add one more source in error. + pub fn with_source(mut self, src: impl Into) -> Self { + self.sources.push(src.into()); self } + /// Return an iterator over all sources of this error. + pub fn sources(&self) -> impl ExactSizeIterator { + self.sources.iter().map(|v| v.as_ref()) + } + /// Default constructor for [`Error`] from [`io::Error`]. pub fn from_io_error(err: io::Error) -> Error { - Error::new("failed to perform io").set_source(err) + Error::new("failed to perform io").with_source(err) } /// Default constructor for [`Error`] from [`fmt::Error`]. pub fn from_fmt_error(err: fmt::Error) -> Error { - Error::new("failed to perform format").set_source(err) + Error::new("failed to perform format").with_source(err) } } diff --git a/core/src/logger/log_impl.rs b/core/src/logger/log_impl.rs index 9e9a336..d4a0860 100644 --- a/core/src/logger/log_impl.rs +++ b/core/src/logger/log_impl.rs @@ -14,7 +14,6 @@ use std::io::Write; use std::panic; -use std::sync::Once; use std::sync::OnceLock; use crate::Append; @@ -40,49 +39,7 @@ pub fn default_logger() -> &'static Logger { /// If a default logger has already been set, the function returns the provided logger /// as an error. pub fn set_default_logger(logger: Logger) -> Result<(), Logger> { - static ATEXIT_CALLBACK: Once = Once::new(); - - DEFAULT_LOGGER.set(logger)?; - ATEXIT_CALLBACK.call_once(flush_default_logger_at_exit); - Ok(()) -} - -fn flush_default_logger_at_exit() { - // Rust never calls `drop` for static variables. - // - // Setting up an exit handler gives us a chance to flush the default logger - // once at the program exit, thus we don't lose the last logs. - - extern "C" fn handler() { - if let Some(default_logger) = DEFAULT_LOGGER.get() { - default_logger.exit(); - } - } - - #[must_use] - fn try_atexit() -> bool { - use std::os::raw::c_int; - - unsafe extern "C" { - fn atexit(cb: extern "C" fn()) -> c_int; - } - - (unsafe { atexit(handler) }) == 0 - } - - fn hook_panic() { - let previous_hook = panic::take_hook(); - - panic::set_hook(Box::new(move |info| { - handler(); - previous_hook(info); - })); - } - - if !try_atexit() { - // if we failed to register the `atexit` handler, at least we hook into panic - hook_panic(); - } + DEFAULT_LOGGER.set(logger) } /// A logger that dispatches log records to one or more dispatcher. @@ -122,15 +79,6 @@ impl Logger { } } } - - /// Perform any cleanup work before the program exits. - pub fn exit(&self) { - for dispatch in &self.dispatches { - for err in dispatch.exit() { - handle_exit_error(&err); - } - } - } } /// A grouped set of appenders and filters. @@ -208,16 +156,6 @@ impl Dispatch { } errors } - - fn exit(&self) -> Vec { - let mut errors = vec![]; - for append in &self.appends { - if let Err(err) = append.exit() { - errors.push(err); - } - } - errors - } } fn handle_log_error(record: &Record, error: &Error) { @@ -270,23 +208,3 @@ Error performing stderr logging after error occurred during regular flush. "###, ); } - -fn handle_exit_error(error: &Error) { - let Err(fallback_error) = write!( - std::io::stderr(), - r###" -Error perform exit. - Error: {error:?} -"###, - ) else { - return; - }; - - panic!( - r###" -Error performing stderr logging after error occurred during atexit. - Error: {error:?} - Fallback error: {fallback_error} -"###, - ); -} diff --git a/logforth/Cargo.toml b/logforth/Cargo.toml index 3a3cdbb..5feb1df 100644 --- a/logforth/Cargo.toml +++ b/logforth/Cargo.toml @@ -90,11 +90,6 @@ tokio = { workspace = true, features = ["full"] } [lints] workspace = true -[[test]] -harness = false -name = "global_async_sink" -required-features = ["starter-log", "append-async"] - [[example]] doc-scrape-examples = true name = "asynchronous" diff --git a/logforth/examples/asynchronous.rs b/logforth/examples/asynchronous.rs index c36aabc..88c9892 100644 --- a/logforth/examples/asynchronous.rs +++ b/logforth/examples/asynchronous.rs @@ -37,4 +37,7 @@ fn main() { log::info!("Hello single info!"); log::debug!("Hello single debug!"); log::trace!("Hello single trace!"); + + // ensure all async events buffered are written out + logforth::core::default_logger().flush(); } diff --git a/logforth/tests/global_async_sink.rs b/logforth/tests/global_async_sink.rs deleted file mode 100644 index 72faad5..0000000 --- a/logforth/tests/global_async_sink.rs +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright 2024 FastLabs Developers -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! This case ensures that the asynchronous logger flushes correctly at program exit. - -// This refers to https://github.com/SpriteOvO/spdlog-rs/issues/64 - -use std::fmt::Write; -use std::os::raw::c_int; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; - -use logforth_append_async::AsyncBuilder; -use logforth_core::Append; -use logforth_core::Diagnostic; -use logforth_core::Error; -use logforth_core::record::LevelFilter; -use logforth_core::record::Record; - -static IS_LOGGED: AtomicBool = AtomicBool::new(false); -static IS_FLUSHED: AtomicBool = AtomicBool::new(false); - -#[derive(Debug)] -struct SetFlags; - -impl Append for SetFlags { - fn append(&self, _: &Record, _: &[Box]) -> Result<(), Error> { - IS_LOGGED.store(true, Ordering::SeqCst); - Ok(()) - } - - fn flush(&self) -> Result<(), Error> { - // assert that the record has been logged before flushing - assert!(IS_LOGGED.load(Ordering::SeqCst)); - IS_FLUSHED.store(true, Ordering::SeqCst); - Ok(()) - } -} - -fn run_test() { - { - extern "C" fn check() { - // assert that `Async` appender in the default logger will be flushed correctly - // and will not panic. - assert!(IS_FLUSHED.load(Ordering::SeqCst)); - } - - // set up `atexit` to check the flag at the end of the program - unsafe extern "C" { - fn atexit(cb: extern "C" fn()) -> c_int; - } - - assert_eq!(unsafe { atexit(check) }, 0); - - let asynchronous = AsyncBuilder::new("async-appender").append(SetFlags).build(); - - logforth::starter_log::builder() - .dispatch(|d| d.filter(LevelFilter::All).append(asynchronous)) - .apply(); - } - - log::info!("hello async sink"); -} - -fn main() { - // This is a flaky test, it only has a certain probability of failing, - // so we run it multiple times to make sure it's really working properly. - { - let mut captured_output = String::new(); - let args = std::env::args().collect::>(); - - let is_parent = args.iter().all(|arg| arg != "child"); - if is_parent { - for i in 0..1000 { - let output = std::process::Command::new(&args[0]) - .arg("child") - .stderr(std::process::Stdio::piped()) - .output() - .unwrap(); - - let success = output.status.success(); - writeln!( - captured_output, - "Attempt #{i} = {}", - if success { "ok" } else { "failed!" } - ) - .unwrap(); - - if !success { - eprintln!("{captured_output}"); - - let stderr = String::from_utf8_lossy(&output.stderr).lines().fold( - String::new(), - |mut contents, line| { - writeln!(&mut contents, "> {line}").unwrap(); - contents - }, - ); - - eprintln!("stderr of the failed attempt:\n{stderr}"); - panic!("test failed"); - } - } - return; - } else { - assert_eq!(args[1], "child"); - } - } - - // Run the test after leaving the scope, so the main function ends - // without dropping additional variables, thus exiting faster. This - // should increase the probability of reproducing the error. - run_test(); -}