Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ All notable changes to this project will be documented in this file.

### Breaking changes

* Bump minimum supported Rust version (MSRV) to 1.89.0.
* `Append` has no more `exit` method. Users should compose `logforth::core::default_logger().flush()` with their own graceful shutdown logic.
* `Async` appender's `flush` method is now blocking until all buffered logs are flushed by worker threads. Any errors during flushing will be propagated back to the `flush` caller.
* `Record::payload` is now `std::fmt::Arguments` instead of `Cow<'static, str>`.
* `RecordOwned::as_record` has been removed; use `RecordOwned::with` instead. (This is a limitation of Rust as described [here](https://github.com/rust-lang/rust/issues/92698#issuecomment-3311144848).)

## [0.29.1] 2025-11-03

Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ homepage = "https://github.com/fast/logforth"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/fast/logforth"
rust-version = "1.85.0"
rust-version = "1.89.0"

[workspace.dependencies]
# Workspace dependencies
Expand All @@ -57,7 +57,6 @@ anyhow = { version = "1.0" }
arc-swap = { version = "1.7.1" }
clap = { version = "4.5.49", features = ["derive"] }
colored = { version = "3.0" }
crossbeam-channel = { version = "0.5.15" }
fastrace = { version = "0.7" }
fasyslog = { version = "1.0.0" }
insta = { version = "1.43.2" }
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
[![MSRV 1.80][msrv-badge]](https://www.whatrustisit.com)
[![MSRV 1.89][msrv-badge]](https://www.whatrustisit.com)
[![Apache 2.0 licensed][license-badge]][license-url]
[![Build Status][actions-badge]][actions-url]

[crates-badge]: https://img.shields.io/crates/v/logforth.svg
[crates-url]: https://crates.io/crates/logforth
[docs-badge]: https://docs.rs/logforth/badge.svg
[msrv-badge]: https://img.shields.io/badge/MSRV-1.80-green?logo=rust
[msrv-badge]: https://img.shields.io/badge/MSRV-1.89-green?logo=rust
[docs-url]: https://docs.rs/logforth
[license-badge]: https://img.shields.io/crates/l/logforth
[license-url]: LICENSE
Expand Down Expand Up @@ -220,7 +220,7 @@ Components are organized into several crates:

## Minimum Rust version policy

This crate is built against the latest stable release, and its minimum supported rustc version is 1.85.0.
This crate is built against the latest stable release, and its minimum supported rustc version is 1.89.0.

The policy is that the minimum Rust version required to use this crate can be increased in minor version updates. For example, if Logforth 1.0 requires Rust 1.60.0, then Logforth 1.0.z for all values of z will also require Rust 1.60.0 or newer. However, Logforth 1.y for y > 0 may require a newer minimum version of Rust.

Expand Down
1 change: 0 additions & 1 deletion appenders/async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
arc-swap = { workspace = true }
crossbeam-channel = { workspace = true }
logforth-core = { workspace = true }
oneshot = { workspace = true }

Expand Down
7 changes: 2 additions & 5 deletions appenders/async/src/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use logforth_core::trap::BestEffortTrap;

use crate::Overflow;
use crate::Task;
use crate::channel::channel;
use crate::state::AsyncState;
use crate::worker::Worker;

Expand Down Expand Up @@ -145,11 +146,7 @@ impl AsyncBuilder {
overflow,
} = self;

let (sender, receiver) = match buffered_lines_limit {
Some(limit) => crossbeam_channel::bounded(limit),
None => crossbeam_channel::unbounded(),
};

let (sender, receiver) = channel(buffered_lines_limit);
let worker = Worker::new(appends, receiver, trap);
let thread_handle = std::thread::Builder::new()
.name(thread_name)
Expand Down
63 changes: 63 additions & 0 deletions appenders/async/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::mpsc;

pub(crate) fn channel<T>(bound: Option<usize>) -> (Sender<T>, Receiver<T>) {
match bound {
Some(bound) => {
let (tx, rx) = mpsc::sync_channel(bound);
(Sender::Bounded(tx), rx)
}
None => {
let (tx, rx) = mpsc::channel();
(Sender::Unbounded(tx), rx)
}
}
}

pub(crate) type Receiver<T> = mpsc::Receiver<T>;

#[derive(Clone)]
pub(crate) enum Sender<T> {
Unbounded(mpsc::Sender<T>),
Bounded(mpsc::SyncSender<T>),
}

impl<T> std::fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Sender::Unbounded(tx) => tx.fmt(f),
Sender::Bounded(tx) => tx.fmt(f),
}
}
}

impl<T> Sender<T> {
pub(crate) fn send(&self, value: T) -> Result<(), mpsc::SendError<T>> {
match self {
Sender::Unbounded(s) => s.send(value),
Sender::Bounded(s) => s.send(value),
}
}

pub(crate) fn try_send(&self, value: T) -> Result<(), mpsc::TrySendError<T>> {
match self {
Sender::Unbounded(s) => s
.send(value)
.map_err(|e| mpsc::TrySendError::Disconnected(e.0)),
Sender::Bounded(s) => s.try_send(value),
}
}
}
1 change: 1 addition & 0 deletions appenders/async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use logforth_core::kv;
use logforth_core::record::RecordOwned;

mod append;
mod channel;
mod state;
mod worker;

Expand Down
15 changes: 7 additions & 8 deletions appenders/async/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::mpsc;
use std::thread::JoinHandle;

use arc_swap::ArcSwapOption;
use crossbeam_channel::Sender;
use logforth_core::Error;

use crate::Overflow;
use crate::Task;
use crate::channel::Sender;

#[derive(Debug)]
pub(crate) struct AsyncState(ArcSwapOption<State>);
Expand Down Expand Up @@ -56,13 +57,11 @@ impl AsyncState {
}),
Overflow::DropIncoming => match sender.try_send(task) {
Ok(()) => Ok(()),
Err(crossbeam_channel::TrySendError::Full(_)) => Ok(()),
Err(crossbeam_channel::TrySendError::Disconnected(task)) => {
Err(Error::new(match task {
Task::Log { .. } => "failed to send log task to async appender",
Task::Flush { .. } => "failed to send flush task to async appender",
}))
}
Err(mpsc::TrySendError::Full(_)) => Ok(()),
Err(mpsc::TrySendError::Disconnected(task)) => Err(Error::new(match task {
Task::Log { .. } => "failed to send log task to async appender",
Task::Flush { .. } => "failed to send flush task to async appender",
})),
},
}
}
Expand Down
16 changes: 9 additions & 7 deletions appenders/async/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crossbeam_channel::Receiver;
use logforth_core::Append;
use logforth_core::Diagnostic;
use logforth_core::Error;
Expand All @@ -21,6 +20,7 @@ use logforth_core::kv;
use logforth_core::kv::Visitor;

use crate::Task;
use crate::channel::Receiver;

pub(crate) struct Worker {
appends: Vec<Box<dyn Append>>,
Expand Down Expand Up @@ -56,13 +56,15 @@ impl Worker {
} else {
&[Box::new(OwnedDiagnostic(diags))]
};
let record = record.as_record();
for append in appends.iter() {
if let Err(err) = append.append(&record, diags) {
let err = Error::new("failed to append record").with_source(err);
trap.trap(&err);

record.with(|record| {
for append in appends.iter() {
if let Err(err) = append.append(&record, diags) {
let err = Error::new("failed to append record").with_source(err);
trap.trap(&err);
}
}
}
});
}
Task::Flush { done } => {
let mut error = None;
Expand Down
6 changes: 5 additions & 1 deletion appenders/fastrace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ pub struct FastraceEvent {}

impl Append for FastraceEvent {
fn append(&self, record: &Record, diags: &[Box<dyn Diagnostic>]) -> Result<(), Error> {
let message = record.payload().to_owned();
let message = if let Some(msg) = record.payload_static() {
Cow::Borrowed(msg)
} else {
Cow::Owned(record.payload().to_string())
};

let mut collector = KvCollector { kv: Vec::new() };
record.key_values().visit(&mut collector)?;
Expand Down
20 changes: 10 additions & 10 deletions appenders/file/src/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ impl State {
} else {
state.current_filesize = last.metadata.len() as usize;

if let Ok(mtime) = last.metadata.modified() {
if let Ok(mtime) = Zoned::try_from(mtime) {
state.next_date_timestamp = state.rotation.next_date_timestamp(&mtime);
state.this_date_timestamp = mtime;
}
if let Ok(mtime) = last.metadata.modified()
&& let Ok(mtime) = Zoned::try_from(mtime)
{
state.next_date_timestamp = state.rotation.next_date_timestamp(&mtime);
state.this_date_timestamp = mtime;
}

// continue to use the existing current log file
Expand Down Expand Up @@ -448,11 +448,11 @@ impl State {
.with_source(err)
})?;

if let Some(max_files) = self.max_files {
if let Err(err) = self.delete_oldest_logs(max_files.get()) {
let err = Error::new("failed to delete oldest logs").with_source(err);
self.trap.trap(&err);
}
if let Some(max_files) = self.max_files
&& let Err(err) = self.delete_oldest_logs(max_files.get())
{
let err = Error::new("failed to delete oldest logs").with_source(err);
self.trap.trap(&err);
}

self.create_log_writer()
Expand Down
4 changes: 2 additions & 2 deletions appenders/file/tests/global_file_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn test_global_file_count_limit() {
writer
.append(
&Record::builder()
.payload(format!("Log entry {}: {}\n", i, "A".repeat(50)))
.payload(format_args!("Log entry {}: {}\n", i, "A".repeat(50)))
.build(),
&[],
)
Expand Down Expand Up @@ -156,7 +156,7 @@ fn create_logs(dir: &Path, max_files: usize, max_size: usize, filename: &str, co
writer
.append(
&Record::builder()
.payload(format!(
.payload(format_args!(
"Prefix {}, Log {}: {}\n",
filename,
i,
Expand Down
8 changes: 8 additions & 0 deletions appenders/journald/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl PutAsFieldValue for &str {
}
}

impl PutAsFieldValue for std::fmt::Arguments<'_> {
fn put_field_value(self, buffer: &mut Vec<u8>) {
// SAFETY: no more than an allocate-less version
// buffer.extend_from_slice(format!("{}", self))
write!(buffer, "{self}").unwrap();
}
}

impl PutAsFieldValue for Value<'_> {
fn put_field_value(self, buffer: &mut Vec<u8>) {
// SAFETY: no more than an allocate-less version
Expand Down
2 changes: 1 addition & 1 deletion appenders/opentelemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Append for OpentelemetryLog {
} else if let Some(payload) = record.payload_static() {
log_record.set_body(AnyValue::from(payload));
} else {
log_record.set_body(AnyValue::from(record.payload().to_owned()));
log_record.set_body(AnyValue::from(record.payload().to_string()));
}

if let Some(module_path) = record.module_path_static() {
Expand Down
6 changes: 1 addition & 5 deletions bridges/log/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ fn forward_log(logger: &Logger, record: &Record) {
};

// payload
builder = if let Some(payload) = record.args().as_str() {
builder.payload(payload)
} else {
builder.payload(record.args().to_string())
};
builder = builder.payload(*record.args());

// key-values
let mut kvs = Vec::new();
Expand Down
12 changes: 7 additions & 5 deletions core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use value_bag::OwnedValueBag;
use value_bag::ValueBag;

use crate::Error;
use crate::str::OwnedStr;
use crate::str::RefStr;

/// A visitor to walk through key-value pairs.
Expand Down Expand Up @@ -63,7 +62,7 @@ impl<'a> Key<'a> {

/// Convert to an owned key.
pub fn to_owned(&self) -> KeyOwned {
KeyOwned(self.0.into_owned())
KeyOwned(self.0.into_cow_static())
}

/// Convert to a `Cow` str.
Expand All @@ -82,12 +81,15 @@ pub type ValueOwned = OwnedValueBag;

/// An owned key in a key-value pair.
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct KeyOwned(OwnedStr);
pub struct KeyOwned(Cow<'static, str>);

impl KeyOwned {
/// Create a `Key` ref.
pub fn by_ref(&self) -> Key<'_> {
Key(self.0.by_ref())
Key(match &self.0 {
Cow::Borrowed(s) => RefStr::Static(s),
Cow::Owned(s) => RefStr::Borrowed(s),
})
}
}

Expand Down Expand Up @@ -143,7 +145,7 @@ impl<'a> KeyValues<'a> {
}
}),
KeyValuesState::Owned(p) => p.iter().find_map(|(k, v)| {
if k.0.get() != key {
if k.0.as_ref() != key {
None
} else {
Some(v.by_ref())
Expand Down
4 changes: 3 additions & 1 deletion core/src/logger/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ impl LoggerBuilder {
/// use logforth_core::record::Record;
///
/// let l = logforth_core::builder().build();
/// let r = Record::builder().payload("hello world!").build();
/// let r = Record::builder()
/// .payload(format_args!("hello world!"))
/// .build();
/// l.log(&r);
/// ```
pub fn build(self) -> Logger {
Expand Down
Loading