diff --git a/lib/prometheus-parser/src/lib.rs b/lib/prometheus-parser/src/lib.rs index b97f65a33eaab..17c1bd29dc715 100644 --- a/lib/prometheus-parser/src/lib.rs +++ b/lib/prometheus-parser/src/lib.rs @@ -605,7 +605,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::ExpectedChar { expected: ',', .. }, .. + kind: ErrorKind::ExpectedChar { expected: ',', .. }, + .. } )); @@ -614,7 +615,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::InvalidMetricKind { .. }, .. + kind: ErrorKind::InvalidMetricKind { .. }, + .. } )); @@ -623,7 +625,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::ExpectedSpace { .. }, .. + kind: ErrorKind::ExpectedSpace { .. }, + .. } )); @@ -632,7 +635,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::ExpectedChar { expected: '"', .. }, .. + kind: ErrorKind::ExpectedChar { expected: '"', .. }, + .. } )); @@ -641,7 +645,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::ExpectedChar { expected: '"', .. }, .. + kind: ErrorKind::ExpectedChar { expected: '"', .. }, + .. } )); @@ -650,7 +655,8 @@ mod test { assert!(matches!( error, ParserError::WithLine { - kind: ErrorKind::ParseFloatError { .. }, .. + kind: ErrorKind::ParseFloatError { .. }, + .. } )); } diff --git a/lib/prometheus-parser/src/line.rs b/lib/prometheus-parser/src/line.rs index b0bb7597d86e6..9b6d5f09ffe08 100644 --- a/lib/prometheus-parser/src/line.rs +++ b/lib/prometheus-parser/src/line.rs @@ -620,10 +620,7 @@ mod test { let input = wrap(r#"{ a="b" ,, c="d" }"#); let error = Metric::parse_labels(&input).unwrap_err().into(); - assert!(matches!( - error, - ErrorKind::ParseNameError { .. } - )); + assert!(matches!(error, ErrorKind::ParseNameError { .. })); } #[test] diff --git a/lib/shared/src/conversion.rs b/lib/shared/src/conversion.rs index f97c5c4bb74a2..f8475be2b141f 100644 --- a/lib/shared/src/conversion.rs +++ b/lib/shared/src/conversion.rs @@ -184,11 +184,11 @@ fn parse_bool(s: &str) -> Result { /// Does the format specifier have a time zone option? fn format_has_zone(fmt: &str) -> bool { - fmt.find("%Z").is_some() - || fmt.find("%z").is_some() - || fmt.find("%:z").is_some() - || fmt.find("%#z").is_some() - || fmt.find("%+").is_some() + fmt.contains("%Z") + || fmt.contains("%z") + || fmt.contains("%:z") + || fmt.contains("%#z") + || fmt.contains("%+") } /// Convert a timestamp with a non-UTC time zone into UTC diff --git a/lib/vrl/cli/src/cmd.rs b/lib/vrl/cli/src/cmd.rs index 14fd817a532db..03297ff2f1cd6 100644 --- a/lib/vrl/cli/src/cmd.rs +++ b/lib/vrl/cli/src/cmd.rs @@ -73,14 +73,13 @@ fn run(opts: &Opts) -> Result<(), Error> { } } -#[cfg(feature = "repl")] fn repl(objects: Vec) -> Result<(), Error> { - repl::run(objects) -} - -#[cfg(not(feature = "repl"))] -fn repl(_: Vec) -> Result<(), Error> { - Err(Error::ReplFeature) + if cfg!(feature = "repl") { + repl::run(objects); + Ok(()) + } else { + Err(Error::ReplFeature) + } } fn execute(object: &mut impl Target, source: String) -> Result { diff --git a/lib/vrl/cli/src/lib.rs b/lib/vrl/cli/src/lib.rs index 1ad764a42af61..b824333b3c21c 100644 --- a/lib/vrl/cli/src/lib.rs +++ b/lib/vrl/cli/src/lib.rs @@ -18,7 +18,6 @@ pub enum Error { #[error("json error")] Json(#[from] serde_json::Error), - #[cfg(not(feature = "repl"))] #[error("repl feature disabled, program input required")] ReplFeature, } diff --git a/lib/vrl/cli/src/repl.rs b/lib/vrl/cli/src/repl.rs index a2023b6273c22..191b52915baf7 100644 --- a/lib/vrl/cli/src/repl.rs +++ b/lib/vrl/cli/src/repl.rs @@ -1,4 +1,3 @@ -use crate::Error; use indoc::indoc; use prettytable::{format, Cell, Row, Table}; use regex::Regex; @@ -13,7 +12,7 @@ use vrl::{diagnostic::Formatter, state, Runtime, Target, Value}; const DOCS_URL: &str = "https://vector.dev/docs/reference/vrl"; -pub(crate) fn run(mut objects: Vec) -> Result<(), Error> { +pub(crate) fn run(mut objects: Vec) { let mut index = 0; let func_docs_regex = Regex::new(r"^help\sdocs\s(\w{1,})$").unwrap(); @@ -82,8 +81,6 @@ pub(crate) fn run(mut objects: Vec) -> Result<(), Error> { } } } - - Ok(()) } fn resolve( diff --git a/lib/vrl/parser/src/lex.rs b/lib/vrl/parser/src/lex.rs index 5c97a7c676e43..80ea444c0f37a 100644 --- a/lib/vrl/parser/src/lex.rs +++ b/lib/vrl/parser/src/lex.rs @@ -5,7 +5,8 @@ use std::iter::Peekable; use std::str::CharIndices; pub type Tok<'input> = Token<&'input str>; -pub type Spanned<'input, Loc> = Result<(Loc, Tok<'input>, Loc), Error>; +pub type SpannedResult<'input, Loc> = Result, Error>; +pub type Spanned<'input, Loc> = (Loc, Tok<'input>, Loc); #[derive(thiserror::Error, Clone, Debug, PartialEq)] pub enum Error { @@ -447,7 +448,7 @@ impl StringLiteral<&str> { // ----------------------------------------------------------------------------- impl<'input> Iterator for Lexer<'input> { - type Item = Spanned<'input, usize>; + type Item = SpannedResult<'input, usize>; fn next(&mut self) -> Option { use Token::*; @@ -461,7 +462,7 @@ impl<'input> Iterator for Lexer<'input> { // represent a physical character, instead it is a boundary marker. if self.query_start(start) { // dbg!("LQuery"); // NOTE: uncomment this for debugging - return self.token2(start, start + 1, LQuery); + return Some(Ok(self.token2(start, start + 1, LQuery))); } // Check if we need to emit a `RQuery` token. @@ -470,7 +471,7 @@ impl<'input> Iterator for Lexer<'input> { // represent a physical character, instead it is a boundary marker. if let Some(pos) = self.query_end(start) { // dbg!("RQuery"); // NOTE: uncomment this for debugging - return self.token2(pos, pos + 1, RQuery); + return Some(Ok(self.token2(pos, pos + 1, RQuery))); } // Advance the internal iterator and emit the next token, or loop @@ -479,26 +480,28 @@ impl<'input> Iterator for Lexer<'input> { let result = match ch { '"' => Some(self.string_literal(start)), - ';' => self.token(start, SemiColon), - '\n' => self.token(start, Newline), - '\\' => self.token(start, Escape), + ';' => Some(Ok(self.token(start, SemiColon))), + '\n' => Some(Ok(self.token(start, Newline))), + '\\' => Some(Ok(self.token(start, Escape))), - '(' => self.open(start, LParen), - '[' => self.open(start, LBracket), - '{' => self.open(start, LBrace), - '}' => self.close(start, RBrace), - ']' => self.close(start, RBracket), - ')' => self.close(start, RParen), + '(' => Some(Ok(self.open(start, LParen))), + '[' => Some(Ok(self.open(start, LBracket))), + '{' => Some(Ok(self.open(start, LBrace))), + '}' => Some(Ok(self.close(start, RBrace))), + ']' => Some(Ok(self.close(start, RBracket))), + ')' => Some(Ok(self.close(start, RParen))), - '.' => self.token(start, Dot), - ':' => self.token(start, Colon), - ',' => self.token(start, Comma), + '.' => Some(Ok(self.token(start, Dot))), + ':' => Some(Ok(self.token(start, Colon))), + ',' => Some(Ok(self.token(start, Comma))), - '_' if self.test_peek(char::is_alphabetic) => Some(self.internal_test(start)), - '_' => self.token(start, Underscore), + '_' if self.test_peek(char::is_alphabetic) => { + Some(Ok(self.internal_test(start))) + } + '_' => Some(Ok(self.token(start, Underscore))), '!' if self.test_peek(|ch| ch == '!' || !is_operator(ch)) => { - self.token(start, Bang) + Some(Ok(self.token(start, Bang))) } '#' => { @@ -510,14 +513,14 @@ impl<'input> Iterator for Lexer<'input> { 's' if self.test_peek(|ch| ch == '\'') => Some(self.raw_string_literal(start)), 't' if self.test_peek(|ch| ch == '\'') => Some(self.timestamp_literal(start)), - ch if is_ident_start(ch) => Some(self.identifier_or_function_call(start)), + ch if is_ident_start(ch) => Some(Ok(self.identifier_or_function_call(start))), ch if is_digit(ch) || (ch == '-' && self.test_peek(is_digit)) => { Some(self.numeric_literal(start)) } - ch if is_operator(ch) => Some(self.operator(start)), + ch if is_operator(ch) => Some(Ok(self.operator(start))), ch if ch.is_whitespace() => continue, - ch => self.token(start, InvalidToken(ch)), + ch => Some(Ok(self.token(start, InvalidToken(ch)))), }; // dbg!(&result); // NOTE: uncomment this for debugging @@ -529,7 +532,7 @@ impl<'input> Iterator for Lexer<'input> { // queries. } else if let Some(end) = self.rquery_indices.pop() { // dbg!("RQuery"); // NOTE: uncomment this for debugging - return self.token2(end, end + 1, RQuery); + return Some(Ok(self.token2(end, end + 1, RQuery))); } return None; @@ -542,7 +545,7 @@ impl<'input> Iterator for Lexer<'input> { // ----------------------------------------------------------------------------- impl<'input> Lexer<'input> { - fn open(&mut self, start: usize, token: Token<&'input str>) -> Option> { + fn open(&mut self, start: usize, token: Token<&'input str>) -> Spanned<'input, usize> { match &token { Token::LParen => self.open_parens += 1, Token::LBracket => self.open_brackets += 1, @@ -553,7 +556,7 @@ impl<'input> Lexer<'input> { self.token(start, token) } - fn close(&mut self, start: usize, token: Token<&'input str>) -> Option> { + fn close(&mut self, start: usize, token: Token<&'input str>) -> Spanned<'input, usize> { match &token { Token::RParen => self.open_parens = self.open_parens.saturating_sub(1), Token::RBracket => self.open_brackets = self.open_brackets.saturating_sub(1), @@ -564,7 +567,7 @@ impl<'input> Lexer<'input> { self.token(start, token) } - fn token(&mut self, start: usize, token: Token<&'input str>) -> Option> { + fn token(&mut self, start: usize, token: Token<&'input str>) -> Spanned<'input, usize> { let end = self.next_index(); self.token2(start, end, token) } @@ -574,8 +577,8 @@ impl<'input> Lexer<'input> { start: usize, end: usize, token: Token<&'input str>, - ) -> Option> { - Some(Ok((start, token, end))) + ) -> Spanned<'input, usize> { + (start, token, end) } fn query_end(&mut self, start: usize) -> Option { @@ -640,7 +643,7 @@ impl<'input> Lexer<'input> { let mut end = 0; while let Some((pos, ch)) = chars.next() { let take_until_end = - |result: Spanned<'input, usize>, + |result: SpannedResult<'input, usize>, last_char: &mut Option, end: &mut usize, chars: &mut Peekable>| { @@ -735,7 +738,7 @@ impl<'input> Lexer<'input> { while let Some((pos, ch)) = chars.peek() { let pos = *pos; - let literal_check = |result: Spanned<'input, usize>, chars: &mut Peekable>| match result { + let literal_check = |result: SpannedResult<'input, usize>, chars: &mut Peekable>| match result { Err(_) => Err(()), Ok((_, _, new)) => { #[allow(clippy::while_let_on_iterator)] @@ -854,7 +857,7 @@ impl<'input> Lexer<'input> { true } - fn string_literal(&mut self, start: usize) -> Spanned<'input, usize> { + fn string_literal(&mut self, start: usize) -> SpannedResult<'input, usize> { let content_start = self.next_index(); loop { @@ -876,19 +879,19 @@ impl<'input> Lexer<'input> { Err(Error::StringLiteral { start }) } - fn regex_literal(&mut self, start: usize) -> Spanned<'input, usize> { + fn regex_literal(&mut self, start: usize) -> SpannedResult<'input, usize> { self.quoted_literal(start, Token::RegexLiteral) } - fn raw_string_literal(&mut self, start: usize) -> Spanned<'input, usize> { + fn raw_string_literal(&mut self, start: usize) -> SpannedResult<'input, usize> { self.quoted_literal(start, |c| Token::StringLiteral(StringLiteral::Raw(c))) } - fn timestamp_literal(&mut self, start: usize) -> Spanned<'input, usize> { + fn timestamp_literal(&mut self, start: usize) -> SpannedResult<'input, usize> { self.quoted_literal(start, Token::TimestampLiteral) } - fn numeric_literal(&mut self, start: usize) -> Spanned<'input, usize> { + fn numeric_literal(&mut self, start: usize) -> SpannedResult<'input, usize> { let (end, int) = self.take_while(start, |ch| is_digit(ch) || ch == '_'); match self.peek() { @@ -928,7 +931,7 @@ impl<'input> Lexer<'input> { Token::ident(ident) }; - Ok((start, token, end)) + (start, token, end) } fn operator(&mut self, start: usize) -> Spanned<'input, usize> { @@ -941,21 +944,21 @@ impl<'input> Lexer<'input> { op => Token::Operator(op), }; - Ok((start, token, end)) + (start, token, end) } fn internal_test(&mut self, start: usize) -> Spanned<'input, usize> { self.bump(); let (end, test) = self.take_while(start, char::is_alphabetic); - Ok((start, Token::InternalTest(test), end)) + (start, Token::InternalTest(test), end) } fn quoted_literal( &mut self, start: usize, tok: impl Fn(&'input str) -> Tok<'input>, - ) -> Spanned<'input, usize> { + ) -> SpannedResult<'input, usize> { self.bump(); let content_start = self.next_index(); @@ -1122,7 +1125,7 @@ mod test { use super::*; use crate::lex::Token::*; - fn lexer(input: &str) -> impl Iterator> + '_ { + fn lexer(input: &str) -> impl Iterator> + '_ { let mut lexer = Lexer::new(input); Box::new(std::iter::from_fn(move || Some(lexer.next()?))) } diff --git a/lib/vrl/stdlib/src/parse_aws_vpc_flow_log.rs b/lib/vrl/stdlib/src/parse_aws_vpc_flow_log.rs index 8b9c9ba0aad54..24d33164fee1f 100644 --- a/lib/vrl/stdlib/src/parse_aws_vpc_flow_log.rs +++ b/lib/vrl/stdlib/src/parse_aws_vpc_flow_log.rs @@ -130,6 +130,7 @@ impl Expression for ParseAwsVpcFlowLogFn { type ParseResult = std::result::Result; +#[allow(clippy::unnecessary_wraps)] // match other parse methods fn identity<'a>(_key: &'a str, value: &'a str) -> ParseResult<&'a str> { Ok(value) } diff --git a/lib/vrl/stdlib/src/parse_common_log.rs b/lib/vrl/stdlib/src/parse_common_log.rs index 13d4336c995c6..0f4b6550b33dc 100644 --- a/lib/vrl/stdlib/src/parse_common_log.rs +++ b/lib/vrl/stdlib/src/parse_common_log.rs @@ -247,22 +247,22 @@ mod tests { test_type_def![ value_string { expr: |_| ParseCommonLogFn { value: Literal::from("foo").boxed(), timestamp_format: None }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } value_non_string { expr: |_| ParseCommonLogFn { value: Literal::from(1).boxed(), timestamp_format: None }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } timestamp_format_string { expr: |_| ParseCommonLogFn { value: Literal::from("foo").boxed(), timestamp_format: Some(Literal::from("foo").boxed()) }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } timestamp_format_non_string { expr: |_| ParseCommonLogFn { value: Literal::from("foo").boxed(), timestamp_format: Some(Literal::from(1).boxed()) }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } ]; } diff --git a/lib/vrl/stdlib/src/parse_glog.rs b/lib/vrl/stdlib/src/parse_glog.rs index 05df8f475f8e5..4b3f54c28fa24 100644 --- a/lib/vrl/stdlib/src/parse_glog.rs +++ b/lib/vrl/stdlib/src/parse_glog.rs @@ -190,17 +190,17 @@ mod tests { test_type_def![ value_string { expr: |_| ParseGlogFn { value: Literal::from("foo").boxed() }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } value_non_string { expr: |_| ParseGlogFn { value: Literal::from(1).boxed() }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } value_optional { expr: |_| ParseGlogFn { value: Box::new(Noop) }, - def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: inner_type_def() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, inner_type_def: Some(inner_type_def()) }, } ]; } diff --git a/rust-toolchain b/rust-toolchain index 7f3a46a841e5d..5a5c7211dc68e 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.49.0 +1.50.0 diff --git a/src/api/schema/components/sink.rs b/src/api/schema/components/sink.rs index a71b096ed6eab..f03ffa7b205e9 100644 --- a/src/api/schema/components/sink.rs +++ b/src/api/schema/components/sink.rs @@ -109,7 +109,7 @@ impl Sink { /// Sink metrics pub async fn metrics(&self) -> metrics::SinkMetrics { - metrics::by_component_name(self.get_name()).to_sink_metrics(self.get_component_type()) + metrics::by_component_name(self.get_name()).into_sink_metrics(self.get_component_type()) } } diff --git a/src/api/schema/components/source.rs b/src/api/schema/components/source.rs index 9594860300b68..2daedaf483fba 100644 --- a/src/api/schema/components/source.rs +++ b/src/api/schema/components/source.rs @@ -107,7 +107,7 @@ impl Source { /// Source metrics pub async fn metrics(&self) -> metrics::SourceMetrics { - metrics::by_component_name(&self.get_name()).to_source_metrics(&self.get_component_type()) + metrics::by_component_name(&self.get_name()).into_source_metrics(&self.get_component_type()) } } diff --git a/src/api/schema/components/transform.rs b/src/api/schema/components/transform.rs index c42abad9a91cc..8ce4f118e8159 100644 --- a/src/api/schema/components/transform.rs +++ b/src/api/schema/components/transform.rs @@ -88,7 +88,7 @@ impl Transform { /// Transform metrics pub async fn metrics(&self) -> metrics::TransformMetrics { - metrics::by_component_name(&self.0.name).to_transform_metrics(&self.get_component_type()) + metrics::by_component_name(&self.0.name).into_transform_metrics(&self.get_component_type()) } } diff --git a/src/api/schema/metrics/sink/mod.rs b/src/api/schema/metrics/sink/mod.rs index 394d12d4ac4e0..32863dde38a45 100644 --- a/src/api/schema/metrics/sink/mod.rs +++ b/src/api/schema/metrics/sink/mod.rs @@ -14,11 +14,11 @@ pub enum SinkMetrics { } pub trait IntoSinkMetrics { - fn to_sink_metrics(self, component_type: &str) -> SinkMetrics; + fn into_sink_metrics(self, component_type: &str) -> SinkMetrics; } impl IntoSinkMetrics for Vec { - fn to_sink_metrics(self, _component_type: &str) -> SinkMetrics { + fn into_sink_metrics(self, _component_type: &str) -> SinkMetrics { SinkMetrics::GenericSinkMetrics(generic::GenericSinkMetrics::new(self)) } } diff --git a/src/api/schema/metrics/source/mod.rs b/src/api/schema/metrics/source/mod.rs index 43c7fa4c9eb6d..15263ca6cf964 100644 --- a/src/api/schema/metrics/source/mod.rs +++ b/src/api/schema/metrics/source/mod.rs @@ -16,11 +16,11 @@ pub enum SourceMetrics { } pub trait IntoSourceMetrics { - fn to_source_metrics(self, component_type: &str) -> SourceMetrics; + fn into_source_metrics(self, component_type: &str) -> SourceMetrics; } impl IntoSourceMetrics for Vec { - fn to_source_metrics(self, component_type: &str) -> SourceMetrics { + fn into_source_metrics(self, component_type: &str) -> SourceMetrics { match component_type { "file" => SourceMetrics::FileSourceMetrics(file::FileSourceMetrics::new(self)), _ => SourceMetrics::GenericSourceMetrics(generic::GenericSourceMetrics::new(self)), diff --git a/src/api/schema/metrics/transform/mod.rs b/src/api/schema/metrics/transform/mod.rs index 09ea49dab26c4..149e384e7e505 100644 --- a/src/api/schema/metrics/transform/mod.rs +++ b/src/api/schema/metrics/transform/mod.rs @@ -14,11 +14,11 @@ pub enum TransformMetrics { } pub trait IntoTransformMetrics { - fn to_transform_metrics(self, component_type: &str) -> TransformMetrics; + fn into_transform_metrics(self, component_type: &str) -> TransformMetrics; } impl IntoTransformMetrics for Vec { - fn to_transform_metrics(self, _component_type: &str) -> TransformMetrics { + fn into_transform_metrics(self, _component_type: &str) -> TransformMetrics { TransformMetrics::GenericTransformMetrics(generic::GenericTransformMetrics::new(self)) } } diff --git a/src/internal_events/open.rs b/src/internal_events/open.rs index 111ff5d7ad4f0..d32c38cada1e5 100644 --- a/src/internal_events/open.rs +++ b/src/internal_events/open.rs @@ -69,13 +69,13 @@ fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) { emitter(new_value); // Try to update gauge to new value and releasing writes to gauge metric in the process. // Otherwise acquire new writes to gauge metric. - let latest = gauge.compare_and_swap(value, new_value, Ordering::AcqRel); - if value == latest { + value = match gauge.compare_exchange(value, new_value, Ordering::AcqRel, Ordering::Acquire) + { // Success - break; + Ok(_) => break, + // Try again with new value + Err(v) => v, } - // Try again with new value - value = latest; } // In the worst case scenario we will emit `n^2 / 2` times when there are `n` parallel diff --git a/src/kubernetes/stream.rs b/src/kubernetes/stream.rs index 4974e8eded33c..319f49673ea63 100644 --- a/src/kubernetes/stream.rs +++ b/src/kubernetes/stream.rs @@ -121,7 +121,12 @@ mod tests { { let err = out_stream.next().await.unwrap().unwrap_err(); - assert!(matches!(err, Error::Reading { source: hyper::Error { .. } })); + assert!(matches!( + err, + Error::Reading { + source: hyper::Error { .. } + } + )); } assert!(out_stream.next().await.is_none()); @@ -139,7 +144,12 @@ mod tests { { let err = out_stream.next().await.unwrap().unwrap_err(); - assert!(matches!(err, Error::Parsing { source: response::Error::Json(_) })); + assert!(matches!( + err, + Error::Parsing { + source: response::Error::Json(_) + } + )); } assert!(out_stream.next().await.is_none()); diff --git a/src/mapping/query/function/split.rs b/src/mapping/query/function/split.rs index cea5e41ef1c27..3bc9e7ab65153 100644 --- a/src/mapping/query/function/split.rs +++ b/src/mapping/query/function/split.rs @@ -23,6 +23,7 @@ impl SplitFn { } impl Function for SplitFn { + #[allow(clippy::collapsible_match)] // I expect this file to be going away shortly fn execute(&self, ctx: &Event) -> Result { let string = { let bytes = required_value!(ctx, self.path, Value::Bytes(v) => v); @@ -71,10 +72,7 @@ impl Function for SplitFn { }, Parameter { keyword: "pattern", - accepts: |v| { - matches!(v, QueryValue::Value(Value::Bytes(_)) - | QueryValue::Regex(_)) - }, + accepts: |v| matches!(v, QueryValue::Value(Value::Bytes(_)) | QueryValue::Regex(_)), required: true, }, Parameter { diff --git a/src/rusoto/auth.rs b/src/rusoto/auth.rs index 66e17172f7785..7c6bba031cd4b 100644 --- a/src/rusoto/auth.rs +++ b/src/rusoto/auth.rs @@ -101,7 +101,7 @@ mod tests { ) .unwrap(); - assert!(matches!(config.auth, AWSAuthentication::Role{..})); + assert!(matches!(config.auth, AWSAuthentication::Role { .. })); } #[test] @@ -130,6 +130,6 @@ mod tests { ) .unwrap(); - assert!(matches!(config.auth, AWSAuthentication::Static{..})); + assert!(matches!(config.auth, AWSAuthentication::Static { .. })); } } diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs index 1529d18434956..2597e4f259eb8 100644 --- a/src/sinks/aws_kinesis_firehose.rs +++ b/src/sinks/aws_kinesis_firehose.rs @@ -162,7 +162,7 @@ impl KinesisFirehoseService { cx.acker(), ) .sink_map_err(|error| error!(message = "Fatal kinesis firehose sink error.", %error)) - .with_flat_map(move |e| stream::iter(encode_event(e, &encoding)).map(Ok)); + .with_flat_map(move |e| stream::iter(Some(encode_event(e, &encoding))).map(Ok)); Ok(sink) } @@ -240,7 +240,7 @@ enum HealthcheckError { StreamNamesMismatch { name: String, stream_name: String }, } -fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Option { +fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Record { encoding.apply_rules(&mut event); let log = event.into_log(); let data = match encoding.codec() { @@ -254,7 +254,7 @@ fn encode_event(mut event: Event, encoding: &EncodingConfig) -> Option let data = Bytes::from(data); - Some(Record { data }) + Record { data } } #[cfg(test)] @@ -270,7 +270,7 @@ mod tests { #[test] fn firehose_encode_event_text() { let message = "hello world".to_string(); - let event = encode_event(message.clone().into(), &Encoding::Text.into()).unwrap(); + let event = encode_event(message.clone().into(), &Encoding::Text.into()); assert_eq!(&event.data[..], message.as_bytes()); } @@ -280,7 +280,7 @@ mod tests { let message = "hello world".to_string(); let mut event = Event::from(message.clone()); event.as_mut_log().insert("key", "value"); - let event = encode_event(event, &Encoding::Json.into()).unwrap(); + let event = encode_event(event, &Encoding::Json.into()); let map: BTreeMap = serde_json::from_slice(&event.data[..]).unwrap(); diff --git a/src/sinks/aws_s3.rs b/src/sinks/aws_s3.rs index d16707314c9dc..68ecfbc5ea3cf 100644 --- a/src/sinks/aws_s3.rs +++ b/src/sinks/aws_s3.rs @@ -701,7 +701,7 @@ mod integration_tests { assert_downcast_matches!( config.healthcheck(client).await.unwrap_err(), HealthcheckError, - HealthcheckError::UnknownBucket{ .. } + HealthcheckError::UnknownBucket { .. } ); } diff --git a/src/sinks/http.rs b/src/sinks/http.rs index 3ba28c73d6436..1adeefb78bd7c 100644 --- a/src/sinks/http.rs +++ b/src/sinks/http.rs @@ -376,7 +376,7 @@ mod tests { assert_downcast_matches!( super::validate_headers(&config.request.headers, &None).unwrap_err(), BuildError, - BuildError::InvalidHeaderName{..} + BuildError::InvalidHeaderName { .. } ); } diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index b8fca85cc82dc..f7adb367b4557 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -85,18 +85,16 @@ impl SinkConfig for NatsSinkConfig { } impl NatsSinkConfig { - fn to_nats_options(&self) -> crate::Result { + fn to_nats_options(&self) -> nats::Options { // Set reconnect_buffer_size on the nats client to 0 bytes so that the // client doesn't buffer internally (to avoid message loss). - let options = nats::Options::new() + nats::Options::new() .with_name(&self.name) - .reconnect_buffer_size(0); - - Ok(options) + .reconnect_buffer_size(0) } async fn connect(&self) -> crate::Result { - self.to_nats_options()? + self.to_nats_options() .connect_async(&self.url) .map_err(|e| e.into()) .await diff --git a/src/sinks/papertrail.rs b/src/sinks/papertrail.rs index 709d912f20e29..71f44815cef82 100644 --- a/src/sinks/papertrail.rs +++ b/src/sinks/papertrail.rs @@ -65,7 +65,7 @@ impl SinkConfig for PapertrailConfig { let sink_config = TcpSinkConfig::new(address, self.keepalive, tls, self.send_buffer_bytes); - sink_config.build(cx, move |event| encode_event(event, pid, &encoding)) + sink_config.build(cx, move |event| Some(encode_event(event, pid, &encoding))) } fn input_type(&self) -> DataType { @@ -77,7 +77,7 @@ impl SinkConfig for PapertrailConfig { } } -fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) -> Option { +fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) -> Bytes { let host = if let Some(host) = event.as_mut_log().remove(log_schema().host_key()) { Some(host.to_string_lossy()) } else { @@ -110,7 +110,7 @@ fn encode_event(mut event: Event, pid: u32, encoding: &EncodingConfig) s.push(b'\n'); - Some(Bytes::from(s)) + Bytes::from(s) } #[cfg(test)] @@ -137,8 +137,7 @@ mod tests { except_fields: Some(vec!["magic".into()]), timestamp_format: None, }, - ) - .unwrap(); + ); let msg = bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len() - 1); diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index 4ec6c743d8e25..ec5d465d05174 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -406,24 +406,40 @@ async fn run_test(params: TestParams) -> TestResults { .map(|event| (event.name().to_string(), event)) .collect::>(); // Ensure basic statistics are captured, don't actually examine them - assert!( - matches!(metrics.get("adaptive_concurrency_observed_rtt").unwrap().data.value, - MetricValue::Distribution { .. }) - ); - assert!( - matches!(metrics.get("adaptive_concurrency_averaged_rtt").unwrap().data.value, - MetricValue::Distribution { .. }) - ); + assert!(matches!( + metrics + .get("adaptive_concurrency_observed_rtt") + .unwrap() + .data + .value, + MetricValue::Distribution { .. } + )); + assert!(matches!( + metrics + .get("adaptive_concurrency_averaged_rtt") + .unwrap() + .data + .value, + MetricValue::Distribution { .. } + )); if params.concurrency == Concurrency::Adaptive { - assert!( - matches!(metrics.get("adaptive_concurrency_limit").unwrap().data.value, - MetricValue::Distribution { .. }) - ); + assert!(matches!( + metrics + .get("adaptive_concurrency_limit") + .unwrap() + .data + .value, + MetricValue::Distribution { .. } + )); } - assert!( - matches!(metrics.get("adaptive_concurrency_in_flight").unwrap().data.value, - MetricValue::Distribution { .. }) - ); + assert!(matches!( + metrics + .get("adaptive_concurrency_in_flight") + .unwrap() + .data + .value, + MetricValue::Distribution { .. } + )); TestResults { stats, cstats } } diff --git a/src/sinks/util/buffer/metrics.rs b/src/sinks/util/buffer/metrics.rs index 37fdb36c16895..3d3ac4ce0c220 100644 --- a/src/sinks/util/buffer/metrics.rs +++ b/src/sinks/util/buffer/metrics.rs @@ -254,7 +254,7 @@ impl MetricSet { pub fn make_absolute(&mut self, metric: Metric) -> Option { match metric.data.kind { MetricKind::Absolute => Some(metric), - MetricKind::Incremental => self.incremental_to_absolute(metric), + MetricKind::Incremental => Some(self.incremental_to_absolute(metric)), } } @@ -270,7 +270,7 @@ impl MetricSet { /// Convert the incremental metric into an absolute one, using the /// state buffer to keep track of the value throughout the entire /// application uptime. - fn incremental_to_absolute(&mut self, metric: Metric) -> Option { + fn incremental_to_absolute(&mut self, metric: Metric) -> Metric { let mut entry = MetricEntry(metric.into_absolute()); let mut existing = self.0.take(&entry).unwrap_or_else(|| { // Start from zero value if the entry is not found. @@ -279,7 +279,7 @@ impl MetricSet { existing.data.value.add(&entry.data.value); entry.data.value = existing.data.value.clone(); self.0.insert(existing); - Some(entry.0) + entry.0 } /// Convert the absolute metric into an incremental by calculating diff --git a/src/sinks/vector.rs b/src/sinks/vector.rs index 4b352dc009528..54b4def1644ac 100644 --- a/src/sinks/vector.rs +++ b/src/sinks/vector.rs @@ -74,7 +74,7 @@ impl SinkConfig for VectorSinkConfig { self.send_buffer_bytes, ); - sink_config.build(cx, encode_event) + sink_config.build(cx, |event| Some(encode_event(event))) } fn input_type(&self) -> DataType { @@ -92,7 +92,7 @@ enum HealthcheckError { ConnectError { source: std::io::Error }, } -fn encode_event(event: Event) -> Option { +fn encode_event(event: Event) -> Bytes { let event = proto::EventWrapper::from(event); let event_len = event.encoded_len(); let full_len = event_len + 4; @@ -101,7 +101,7 @@ fn encode_event(event: Event) -> Option { out.put_u32(event_len as u32); event.encode(&mut out).unwrap(); - Some(out.into()) + out.into() } #[cfg(test)] diff --git a/src/sources/aws_ecs_metrics/parser.rs b/src/sources/aws_ecs_metrics/parser.rs index e3dc0c0494328..a599b899722c3 100644 --- a/src/sources/aws_ecs_metrics/parser.rs +++ b/src/sources/aws_ecs_metrics/parser.rs @@ -496,8 +496,8 @@ mod test { Utc.ymd(2018, 11, 14).and_hms_nano(8, 9, 10, 11) } - fn namespace() -> Option { - Some("aws_ecs".into()) + fn namespace() -> String { + "aws_ecs".into() } #[test] @@ -535,14 +535,14 @@ mod test { }"##; assert_eq!( - parse(json.as_bytes(), namespace()).unwrap(), + parse(json.as_bytes(), Some(namespace())).unwrap(), vec![ Metric::new( "blkio_recursive_io_service_bytes_total", MetricKind::Absolute, MetricValue::Counter { value: 0.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("device".into(), "202:26368".into()), @@ -562,7 +562,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 520192.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("device".into(), "202:26368".into()), @@ -613,14 +613,14 @@ mod test { }"##; assert_eq!( - parse(json.as_bytes(), namespace()).unwrap(), + parse(json.as_bytes(), Some(namespace())).unwrap(), vec![ Metric::new( "cpu_online_cpus", MetricKind::Absolute, MetricValue::Gauge { value: 2.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -640,7 +640,7 @@ mod test { value: 2007130000000.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -658,7 +658,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 510000000.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -676,7 +676,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 190000000.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -696,7 +696,7 @@ mod test { value: 2324920942.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -714,7 +714,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 0.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -732,7 +732,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 0.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -750,7 +750,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 0.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -770,7 +770,7 @@ mod test { value: 1095931487.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("cpu".into(), "0".into()), @@ -791,7 +791,7 @@ mod test { value: 1228989455.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("cpu".into(), "1".into()), @@ -859,7 +859,7 @@ mod test { } }"##; - let metrics = parse(json.as_bytes(), namespace()).unwrap(); + let metrics = parse(json.as_bytes(), Some(namespace())).unwrap(); assert_eq!( metrics @@ -871,7 +871,7 @@ mod test { MetricKind::Absolute, MetricValue::Gauge { value: 40120320.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -896,7 +896,7 @@ mod test { MetricKind::Absolute, MetricValue::Gauge { value: 47177728.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -921,7 +921,7 @@ mod test { MetricKind::Absolute, MetricValue::Gauge { value: 34885632.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -946,7 +946,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 31131.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ( @@ -985,7 +985,7 @@ mod test { } }"##; - let metrics = parse(json.as_bytes(), namespace()).unwrap(); + let metrics = parse(json.as_bytes(), Some(namespace())).unwrap(); assert_eq!( metrics @@ -997,7 +997,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 329932716.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("device".into(), "eth1".into()), @@ -1023,7 +1023,7 @@ mod test { MetricKind::Absolute, MetricValue::Counter { value: 2001229.0 }, ) - .with_namespace(namespace()) + .with_namespace(Some(namespace())) .with_tags(Some( vec![ ("device".into(), "eth1".into()), diff --git a/src/sources/journald.rs b/src/sources/journald.rs index 3b5f2c65f3b9a..e8e10a99e8c53 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -391,18 +391,16 @@ fn create_event(record: Record) -> Event { log.insert(log_schema().host_key(), host); } // Translate the timestamp, and so leave both old and new names. - if let Some(timestamp) = log + if let Some(Value::Bytes(timestamp)) = log .get(&*SOURCE_TIMESTAMP) .or_else(|| log.get(RECEIVED_TIMESTAMP)) { - if let Value::Bytes(timestamp) = timestamp { - if let Ok(timestamp) = String::from_utf8_lossy(×tamp).parse::() { - let timestamp = chrono::Utc.timestamp( - (timestamp / 1_000_000) as i64, - (timestamp % 1_000_000) as u32 * 1_000, - ); - log.insert(log_schema().timestamp_key(), Value::Timestamp(timestamp)); - } + if let Ok(timestamp) = String::from_utf8_lossy(×tamp).parse::() { + let timestamp = chrono::Utc.timestamp( + (timestamp / 1_000_000) as i64, + (timestamp % 1_000_000) as u32 * 1_000, + ); + log.insert(log_schema().timestamp_key(), Value::Timestamp(timestamp)); } } // Add source type @@ -629,7 +627,7 @@ mod tests { impl FakeJournal { fn new( checkpoint: &Option, - ) -> crate::Result<(BoxStream<'static, io::Result>, StopJournalctlFn)> { + ) -> (BoxStream<'static, io::Result>, StopJournalctlFn) { let cursor = Cursor::new(FAKE_JOURNAL); let reader = BufReader::new(cursor); let mut journal = FakeJournal { reader }; @@ -642,7 +640,7 @@ mod tests { } } - Ok((Box::pin(journal), Box::new(|| ()))) + (Box::pin(journal), Box::new(|| ())) } } @@ -676,7 +674,10 @@ mod tests { remap_priority: true, out: tx, } - .run_shutdown(shutdown, Box::new(FakeJournal::new)); + .run_shutdown( + shutdown, + Box::new(|checkpoint| Ok(FakeJournal::new(checkpoint))), + ); tokio::spawn(source); delay_for(Duration::from_millis(100)).await; diff --git a/src/sources/prometheus/parser.rs b/src/sources/prometheus/parser.rs index 9d9696196a09f..b6c5786bf8a79 100644 --- a/src/sources/prometheus/parser.rs +++ b/src/sources/prometheus/parser.rs @@ -23,14 +23,14 @@ fn utc_timestamp(timestamp: Option) -> Option> { } pub(super) fn parse_text(packet: &str) -> Result, ParserError> { - reparse_groups(prometheus_parser::parse_text(packet)?) + prometheus_parser::parse_text(packet).map(reparse_groups) } pub(super) fn parse_request(request: proto::WriteRequest) -> Result, ParserError> { - reparse_groups(prometheus_parser::parse_request(request)?) + prometheus_parser::parse_request(request).map(reparse_groups) } -fn reparse_groups(groups: Vec) -> Result, ParserError> { +fn reparse_groups(groups: Vec) -> Vec { let mut result = Vec::new(); for group in groups { @@ -128,7 +128,7 @@ fn reparse_groups(groups: Vec) -> Result, ParserError> { } } - Ok(result) + result } #[cfg(test)] diff --git a/src/sources/prometheus/scrape.rs b/src/sources/prometheus/scrape.rs index d39e1b5c4316f..b5b0bd4ca1ca9 100644 --- a/src/sources/prometheus/scrape.rs +++ b/src/sources/prometheus/scrape.rs @@ -435,7 +435,7 @@ mod integration_tests { // Sample some well-known metrics let build = find_metric("prometheus_build_info"); assert!(matches!(build.data.kind, MetricKind::Absolute)); - assert!(matches!(build.data.value, MetricValue::Gauge { ..})); + assert!(matches!(build.data.value, MetricValue::Gauge { .. })); assert!(build.tags().unwrap().contains_key("branch")); assert!(build.tags().unwrap().contains_key("version")); diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index a3495425ba5bf..cf63043398794 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -851,7 +851,7 @@ mod test { #[test] fn parses_unix_datagram_config() { let config = parses_unix_config("unix_datagram"); - assert!(matches!(config.mode,Mode::UnixDatagram { .. })); + assert!(matches!(config.mode, Mode::UnixDatagram { .. })); } ////////////// UNIX STREAM TESTS ////////////// @@ -890,13 +890,13 @@ mod test { #[test] fn parses_new_unix_stream_config() { let config = parses_unix_config("unix_stream"); - assert!(matches!(config.mode,Mode::UnixStream { .. })); + assert!(matches!(config.mode, Mode::UnixStream { .. })); } #[cfg(unix)] #[test] fn parses_old_unix_stream_config() { let config = parses_unix_config("unix"); - assert!(matches!(config.mode,Mode::UnixStream { .. })); + assert!(matches!(config.mode, Mode::UnixStream { .. })); } } diff --git a/src/sources/socket/unix.rs b/src/sources/socket/unix.rs index cb2489a04017d..5525fa4a2a074 100644 --- a/src/sources/socket/unix.rs +++ b/src/sources/socket/unix.rs @@ -40,7 +40,7 @@ impl UnixConfig { * Function to pass to build_unix_*_source, specific to the basic unix source. * Takes a single line of a received message and builds an Event object. **/ -fn build_event(host_key: &str, received_from: Option, line: &str) -> Option { +fn build_event(host_key: &str, received_from: Option, line: &str) -> Event { let byte_size = line.len(); let mut event = Event::from(line); event.as_mut_log().insert( @@ -54,7 +54,7 @@ fn build_event(host_key: &str, received_from: Option, line: &str) -> Opti byte_size, mode: SocketMode::Unix }); - Some(event) + event } pub(super) fn unix_datagram( @@ -71,7 +71,7 @@ pub(super) fn unix_datagram( LinesCodec::new_with_max_length(max_length), shutdown, out, - build_event, + |host_key, received_from, line| Some(build_event(host_key, received_from, line)), ) } @@ -88,6 +88,6 @@ pub(super) fn unix_stream( host_key, shutdown, out, - build_event, + |host_key, received_from, line| Some(build_event(host_key, received_from, line)), ) } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 73e7a7c220022..5b829b78ad102 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -154,7 +154,7 @@ impl SourceConfig for SyslogConfig { host_key, shutdown, out, - event_from_str, + |host_key, default_host, line| Some(event_from_str(host_key, default_host, line)), )), } } @@ -192,7 +192,7 @@ impl TcpSource for SyslogTcpSource { } fn build_event(&self, frame: String, host: Bytes) -> Option { - event_from_str(&self.host_key, Some(host), &frame) + Some(event_from_str(&self.host_key, Some(host), &frame)) } } @@ -336,9 +336,7 @@ pub fn udp( std::str::from_utf8(&bytes) .map_err(|error| emit!(SyslogUdpUtf8Error { error })) .ok() - .and_then(|s| { - event_from_str(&host_key, Some(received_from), s).map(Ok) - }) + .map(|s| Ok(event_from_str(&host_key, Some(received_from), s))) } Err(error) => { emit!(SyslogUdpReadError { error }); @@ -374,7 +372,7 @@ fn resolve_year((month, _date, _hour, _min, _sec): IncompleteDate) -> i32 { // TODO: many more cases to handle: // octet framing (i.e. num bytes as ascii string prefix) with and without delimiters // null byte delimiter in place of newline -fn event_from_str(host_key: &str, default_host: Option, line: &str) -> Option { +fn event_from_str(host_key: &str, default_host: Option, line: &str) -> Event { let line = line.trim(); let parsed = syslog_loose::parse_message_with_year(line, resolve_year); let mut event = Event::from(&parsed.msg[..]); @@ -412,7 +410,7 @@ fn event_from_str(host_key: &str, default_host: Option, line: &str) -> Op event = ?event ); - Some(event) + event } fn insert_fields_from_syslog(event: &mut Event, parsed: Message<&str>) { @@ -622,10 +620,7 @@ mod test { expected.insert("procid", 8449); } - assert_eq!( - event_from_str(&"host".to_string(), None, &raw).unwrap(), - expected - ); + assert_eq!(event_from_str(&"host".to_string(), None, &raw), expected); } #[test] @@ -654,7 +649,7 @@ mod test { } let event = event_from_str(&"host".to_string(), None, &raw); - assert_eq!(event, Some(expected.clone())); + assert_eq!(event, expected); let raw = format!( r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - {} {}"#, @@ -662,7 +657,7 @@ mod test { ); let event = event_from_str(&"host".to_string(), None, &raw); - assert_eq!(event, Some(expected)); + assert_eq!(event, expected); } #[test] @@ -680,7 +675,7 @@ mod test { r#"[empty]"# ); - let event = event_from_str(&"host".to_string(), None, &msg).unwrap(); + let event = event_from_str(&"host".to_string(), None, &msg); assert!(there_is_map_called_empty(event)); let msg = format!( @@ -688,7 +683,7 @@ mod test { r#"[non_empty x="1"][empty]"# ); - let event = event_from_str(&"host".to_string(), None, &msg).unwrap(); + let event = event_from_str(&"host".to_string(), None, &msg); assert!(there_is_map_called_empty(event)); let msg = format!( @@ -696,7 +691,7 @@ mod test { r#"[empty][non_empty x="1"]"# ); - let event = event_from_str(&"host".to_string(), None, &msg).unwrap(); + let event = event_from_str(&"host".to_string(), None, &msg); assert!(there_is_map_called_empty(event)); let msg = format!( @@ -704,7 +699,7 @@ mod test { r#"[empty not_really="testing the test"]"# ); - let event = event_from_str(&"host".to_string(), None, &msg).unwrap(); + let event = event_from_str(&"host".to_string(), None, &msg); assert!(!there_is_map_called_empty(event)); } @@ -717,8 +712,8 @@ mod test { let cleaned = r#"<13>1 2019-02-13T19:48:34+00:00 74794bfb6795 root 8449 - [meta sequenceId="1"] i am foobar"#; assert_eq!( - event_from_str(&"host".to_string(), None, raw).unwrap(), - event_from_str(&"host".to_string(), None, cleaned).unwrap() + event_from_str(&"host".to_string(), None, raw), + event_from_str(&"host".to_string(), None, cleaned) ); } @@ -726,7 +721,7 @@ mod test { fn syslog_ng_default_network() { let msg = "i am foobar"; let raw = format!(r#"<13>Feb 13 20:07:26 74794bfb6795 root[8539]: {}"#, msg); - let event = event_from_str(&"host".to_string(), None, &raw).unwrap(); + let event = event_from_str(&"host".to_string(), None, &raw); let mut expected = Event::from(msg); { @@ -756,7 +751,7 @@ mod test { r#"<190>Feb 13 21:31:56 74794bfb6795 liblogging-stdlog: [origin software="rsyslogd" swVersion="8.24.0" x-pid="8979" x-info="http://www.rsyslog.com"] {}"#, msg ); - let event = event_from_str(&"host".to_string(), None, &raw).unwrap(); + let event = event_from_str(&"host".to_string(), None, &raw); let mut expected = Event::from(msg); { @@ -811,9 +806,6 @@ mod test { expected.insert("origin.x-info", "http://www.rsyslog.com"); } - assert_eq!( - event_from_str(&"host".to_string(), None, &raw).unwrap(), - expected - ); + assert_eq!(event_from_str(&"host".to_string(), None, &raw), expected); } } diff --git a/src/test_util/stats.rs b/src/test_util/stats.rs index c50e6125887f7..d7dbbab9c9ff0 100644 --- a/src/test_util/stats.rs +++ b/src/test_util/stats.rs @@ -163,8 +163,8 @@ impl WeightedSum { pub fn add(&mut self, value: f64, weight: f64) { self.total += value * weight; self.weights += weight; - self.max = opt_max(self.max, value); - self.min = opt_min(self.min, value); + self.max = Some(opt_max(self.max, value)); + self.min = Some(opt_min(self.min, value)); } pub fn mean(&self) -> Option { @@ -197,20 +197,20 @@ impl Display for WeightedSum { } } -fn opt_max(opt: Option, value: f64) -> Option { - Some(match opt { +fn opt_max(opt: Option, value: f64) -> f64 { + match opt { None => value, Some(s) if s > value => s, _ => value, - }) + } } -fn opt_min(opt: Option, value: f64) -> Option { - Some(match opt { +fn opt_min(opt: Option, value: f64) -> f64 { + match opt { None => value, Some(s) if s < value => s, _ => value, - }) + } } /// A TimeWeightedSum is a wrapper around WeightedSum that keeps track diff --git a/src/tls/settings.rs b/src/tls/settings.rs index c29b02c925ddd..a5b7f51578567 100644 --- a/src/tls/settings.rs +++ b/src/tls/settings.rs @@ -434,7 +434,7 @@ fn der_or_pem(data: Vec, der_fn: impl Fn(Vec) -> T, pem_fn: impl Fn(S /// inline data and is used directly instead of opening a file. fn open_read(filename: &Path, note: &'static str) -> Result<(Vec, PathBuf)> { if let Some(filename) = filename.to_str() { - if filename.find(PEM_START_MARKER).is_some() { + if filename.contains(PEM_START_MARKER) { return Ok((Vec::from(filename), "inline text".into())); } } diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index e6d597eb8f45d..d16494e6e1781 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -180,12 +180,12 @@ impl TaskTransform for Ec2MetadataTransform { Self: 'static, { let mut inner = self; - Box::pin(task.filter_map(move |event| ready(inner.transform_one(event)))) + Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event))))) } } impl Ec2MetadataTransform { - fn transform_one(&mut self, mut event: Event) -> Option { + fn transform_one(&mut self, mut event: Event) -> Event { let log = event.as_mut_log(); if let Some(read_ref) = self.state.read() { @@ -196,7 +196,7 @@ impl Ec2MetadataTransform { }); } - Some(event) + event } }