Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 41 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
FROM docker.io/rust:1.90.0-bookworm AS chef
RUN cargo install cargo-chef
WORKDIR /app

FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json

# Update the rust version in-sync with the version in rust-toolchain.toml
FROM docker.io/rust:1.90.0-bookworm AS builder
FROM chef AS builder
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook --release --recipe-path recipe.json

RUN apt-get update && apt-get install -y \
protobuf-compiler fuse3 libfuse3-dev \
Expand All @@ -10,9 +20,37 @@ COPY . /app
RUN cargo build --release --locked --bin lading --features logrotate_fs

FROM docker.io/debian:bookworm-20241202-slim
RUN apt-get update && apt-get install -y libfuse3-dev=3.14.0-4 fuse3=3.14.0-4 && rm -rf /var/lib/apt/lists/*
RUN apt-get update && apt-get install -y \
libfuse3-dev=3.14.0-4 \
fuse3=3.14.0-4 \
tcpdump \
procps \
lsof \
net-tools \
iproute2 \
curl \
strace \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/lading /usr/bin/lading

# Create wrapper script that starts tcpdump before lading
RUN echo '#!/bin/bash\n\
set -e\n\
\n\
echo "Starting tcpdump on port 11538..."\n\
# Full-packet capture with large buffer to avoid drops/truncation\n\
tcpdump -i any -nn -s0 -B 512000 -w /captures/grpc-traffic.pcap tcp port 11538 &\n\
TCPDUMP_PID=$!\n\
echo $TCPDUMP_PID > /tmp/tcpdump.pid\n\
echo "tcpdump started with PID $TCPDUMP_PID, capturing to /captures/grpc-traffic.pcap"\n\
\n\
# Give tcpdump time to initialize\n\
sleep 2\n\
\n\
# Start lading with all arguments\n\
exec /usr/bin/lading "$@"\n\
' > /usr/bin/lading-wrapper && chmod +x /usr/bin/lading-wrapper

# smoke test
RUN ["/usr/bin/lading", "--help"]
ENTRYPOINT ["/usr/bin/lading"]
ENTRYPOINT ["/usr/bin/lading-wrapper"]
60 changes: 59 additions & 1 deletion lading/src/generator/file_gen/traditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use tokio::{
fs,
io::{AsyncWriteExt, BufWriter},
task::{JoinError, JoinSet},
time::{Duration, Instant},
};
use tracing::{error, info};

Expand Down Expand Up @@ -120,6 +121,18 @@ pub struct Config {
rotate: bool,
/// The load throttle configuration
pub throttle: Option<BytesThrottleConfig>,
/// Optional fixed interval between blocks. When set, the generator waits
/// this duration before emitting the next block, regardless of byte size.
pub block_interval_millis: Option<u64>,
/// Flush after each block. Useful when block intervals are large and the
/// buffered writer would otherwise delay writes to disk.
#[serde(default)]
pub flush_each_block: bool,
/// Optional starting line offset into the block cache. This advances
/// through blocks until the cumulative line count reaches this value,
/// then begins emitting from that block. If data point counts are not
/// available for a payload, this setting is effectively ignored.
pub start_line_index: Option<u64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -195,6 +208,9 @@ impl Server {
file_index: Arc::clone(&file_index),
rotate: config.rotate,
shutdown: shutdown.clone(),
block_interval: config.block_interval_millis.map(Duration::from_millis),
flush_each_block: config.flush_each_block,
start_line_index: config.start_line_index.unwrap_or(0),
};

handles.spawn(child.spin());
Expand Down Expand Up @@ -269,6 +285,9 @@ struct Child {
rotate: bool,
file_index: Arc<AtomicU32>,
shutdown: lading_signal::Watcher,
block_interval: Option<Duration>,
flush_each_block: bool,
start_line_index: u64,
}

impl Child {
Expand Down Expand Up @@ -300,16 +319,52 @@ impl Child {
);

let mut handle = self.block_cache.handle();

if self.start_line_index > 0 {
let mut remaining = self.start_line_index;
// Walk blocks until we reach or surpass the requested line offset.
// If metadata is missing, assume at least one data point to ensure progress.
loop {
let md = self.block_cache.peek_next_metadata(&handle);
let mut lines = md.data_points.unwrap_or(1);
if lines == 0 {
lines = 1;
}
if lines >= remaining {
break;
}
remaining = remaining.saturating_sub(lines);
let _ = self.block_cache.advance(&mut handle);
}
}
let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
let mut next_tick = self
.block_interval
.as_ref()
.map(|dur| Instant::now() + *dur);
loop {
let total_bytes = self.block_cache.peek_next_size(&handle);

tokio::select! {
result = self.throttle.wait_for(total_bytes) => {
match result {
Ok(()) => {
if let Some(dur) = self.block_interval {
if let Some(deadline) = next_tick {
tokio::select! {
_ = tokio::time::sleep_until(deadline) => {},
() = &mut shutdown_wait => {
fp.flush().await?;
info!("shutdown signal received");
return Ok(());
},
}
next_tick = Some(deadline + dur);
} else {
next_tick = Some(Instant::now() + dur);
}
}

let block = self.block_cache.advance(&mut handle);
let total_bytes = u64::from(total_bytes.get());

Expand All @@ -318,6 +373,9 @@ impl Child {
counter!("bytes_written").increment(total_bytes);
total_bytes_written += total_bytes;
}
if self.flush_each_block {
fp.flush().await?;
}

if total_bytes_written > maximum_bytes_per_file {
fp.flush().await?;
Expand Down
2 changes: 2 additions & 0 deletions lading_payload/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rand = { workspace = true, features = [
"std",
"std_rng",
] }
chrono = { version = "0.4", default-features = true, features = ["std"] }
rmp-serde = { version = "1.1", default-features = false }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -42,6 +43,7 @@ arbitrary = { version = "1", optional = true, features = ["derive"] }
proptest = { workspace = true }
proptest-derive = { workspace = true }
criterion = { version = "0.7", features = ["html_reports"] }
tempfile = { workspace = true }

[features]
default = []
Expand Down
56 changes: 56 additions & 0 deletions lading_payload/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub enum SpinError {
/// Static payload creation error
#[error(transparent)]
Static(#[from] crate::statik::Error),
/// Static line-rate payload creation error
#[error(transparent)]
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
/// Static second-grouped payload creation error
#[error(transparent)]
StaticSecond(#[from] crate::statik_second::Error),
/// rng slice is Empty
#[error("RNG slice is empty")]
EmptyRng,
Expand Down Expand Up @@ -55,6 +61,12 @@ pub enum Error {
/// Static payload creation error
#[error(transparent)]
Static(#[from] crate::statik::Error),
/// Static line-rate payload creation error
#[error(transparent)]
StaticLinesPerSecond(#[from] crate::statik_line_rate::Error),
/// Static second-grouped payload creation error
#[error(transparent)]
StaticSecond(#[from] crate::statik_second::Error),
/// Error for crate deserialization
#[error("Deserialization error: {0}")]
Deserialize(#[from] crate::Error),
Expand Down Expand Up @@ -337,6 +349,42 @@ impl Cache {
total_bytes.get(),
)?
}
crate::Config::StaticLinesPerSecond {
static_path,
lines_per_second,
} => {
let span = span!(Level::INFO, "fixed", payload = "static-lines-per-second");
let _guard = span.enter();
let mut serializer =
crate::StaticLinesPerSecond::new(static_path, *lines_per_second)?;
construct_block_cache_inner(
&mut rng,
&mut serializer,
maximum_block_bytes,
total_bytes.get(),
)?
}
crate::Config::StaticSecond {
static_path,
timestamp_format,
emit_placeholder,
start_line_index,
} => {
let span = span!(Level::INFO, "fixed", payload = "static-second");
let _guard = span.enter();
let mut serializer = crate::StaticSecond::new(
static_path,
&timestamp_format,
*emit_placeholder,
start_line_index.unwrap_or(0),
)?;
construct_block_cache_inner(
&mut rng,
&mut serializer,
maximum_block_bytes,
total_bytes.get(),
)?
}
crate::Config::OpentelemetryTraces => {
let mut pyld = crate::OpentelemetryTraces::new(&mut rng);
let span = span!(Level::INFO, "fixed", payload = "otel-traces");
Expand Down Expand Up @@ -409,6 +457,14 @@ impl Cache {
}
}

/// Get the number of blocks in the cache.
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::Fixed { blocks, .. } => blocks.len(),
}
}

/// Get metadata of the next block without advancing.
#[must_use]
pub fn peek_next_metadata(&self, handle: &Handle) -> BlockMetadata {
Expand Down
37 changes: 37 additions & 0 deletions lading_payload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub use opentelemetry::metric::OpentelemetryMetrics;
pub use opentelemetry::trace::OpentelemetryTraces;
pub use splunk_hec::SplunkHec;
pub use statik::Static;
pub use statik_line_rate::StaticLinesPerSecond;
pub use statik_second::StaticSecond;
pub use syslog::Syslog5424;

pub mod apache_common;
Expand All @@ -40,6 +42,8 @@ pub mod opentelemetry;
pub mod procfs;
pub mod splunk_hec;
pub mod statik;
pub mod statik_line_rate;
pub mod statik_second;
pub mod syslog;
pub mod trace_agent;

Expand Down Expand Up @@ -129,6 +133,31 @@ pub enum Config {
/// assumed to be line-oriented but no other claim is made on the file.
static_path: PathBuf,
},
/// Generates static data but limits the number of lines emitted per block
StaticLinesPerSecond {
/// Defines the file path to read static variant data from. Content is
/// assumed to be line-oriented but no other claim is made on the file.
static_path: PathBuf,
/// Number of lines to emit in each generated block
lines_per_second: u32,
},
/// Generates static data grouped by second; each block contains one
/// second's worth of logs as determined by a parsed timestamp prefix.
StaticSecond {
/// Defines the file path to read static variant data from. Content is
/// assumed to be line-oriented.
static_path: PathBuf,
/// Chrono-compatible timestamp format string used to parse the leading
/// timestamp in each line.
timestamp_format: String,
/// Emit a minimal placeholder block (single newline) for seconds with
/// no lines. When false, empty seconds are skipped.
#[serde(default)]
emit_placeholder: bool,
/// Optional starting line offset; lines before this index are skipped.
#[serde(default)]
start_line_index: Option<u64>,
},
/// Generates a line of printable ascii characters
Ascii,
/// Generates a json encoded line
Expand Down Expand Up @@ -167,6 +196,10 @@ pub enum Payload {
SplunkHec(splunk_hec::SplunkHec),
/// Static file content
Static(Static),
/// Static file content with a fixed number of lines emitted per block
StaticLinesPerSecond(StaticLinesPerSecond),
/// Static file content grouped into one-second blocks based on timestamps
StaticSecond(StaticSecond),
/// Syslog RFC 5424 format
Syslog(Syslog5424),
/// OpenTelemetry traces
Expand Down Expand Up @@ -195,6 +228,8 @@ impl Serialize for Payload {
Payload::Json(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::SplunkHec(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::Static(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::StaticLinesPerSecond(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::StaticSecond(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::Syslog(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::OtelTraces(ser) => ser.to_bytes(rng, max_bytes, writer),
Payload::OtelLogs(ser) => ser.to_bytes(rng, max_bytes, writer),
Expand All @@ -207,6 +242,8 @@ impl Serialize for Payload {
fn data_points_generated(&self) -> Option<u64> {
match self {
Payload::OtelMetrics(ser) => ser.data_points_generated(),
Payload::StaticLinesPerSecond(ser) => ser.data_points_generated(),
Payload::StaticSecond(ser) => ser.data_points_generated(),
// Other implementations use the default None
_ => None,
}
Expand Down
Loading