diff --git a/.cargo/config b/.cargo/config deleted file mode 100644 index 174955f..0000000 --- a/.cargo/config +++ /dev/null @@ -1,5 +0,0 @@ -[target.x86_64-unknown-linux-musl] -rustflags = ["-C", "linker=rust-lld"] - -[target.x86_64-pc-windows-msvc] -rustflags = ["-C", "target-feature=+crt-static"] \ No newline at end of file diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..e932fc0 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,8 @@ +[target.x86_64-unknown-linux-musl] +rustflags = ["-C", "linker=rust-lld"] + +[target.x86_64-pc-windows-msvc] +rustflags = ["-C", "target-feature=+crt-static"] + +[target.aarch64-unknown-linux-musl] +rustflags = ["-C", "linker=rust-lld"] diff --git a/Cargo.lock b/Cargo.lock index 2289be9..7065b8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,23 +2,89 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "adler32" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "autocfg" -version = "1.0.1" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + +[[package]] +name = "bitflags" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytes" -version = "1.0.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" + +[[package]] +name = "cc" +version = "1.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fb8dd288a69fc53a1996d7ecfbf4a20d59065bff137ce7e56bbd620de191189" +dependencies = [ + "shlex", +] [[package]] name = "cfg-if" @@ -28,32 +94,39 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.19" +version = "0.4.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" dependencies = [ - "libc", - "num-integer", + "android-tzdata", + "iana-time-zone", + "js-sys", "num-traits", "serde", - "time", - "winapi", + "wasm-bindgen", + "windows-targets", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "crc32fast" -version = "1.2.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" +checksum = "a97769d94ddab943e4510d138150169a2758b5ef3eb191a9ee688de3e23ef7b3" dependencies = [ "cfg-if", ] [[package]] name = "futures" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -66,9 +139,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -76,15 +149,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -93,18 +166,16 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ - "autocfg", - "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -112,23 +183,22 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.15" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ - "autocfg", "futures-channel", "futures-core", "futures-io", @@ -138,43 +208,76 @@ dependencies = [ "memchr", "pin-project-lite", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + [[package]] name = "hermit-abi" -version = "0.1.19" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "iana-time-zone" +version = "0.1.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ - "libc", + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", ] [[package]] name = "inlinable_string" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3094308123a0e9fd59659ce45e22de9f53fc1d2ac6e1feb9fef988e4f76cad77" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" [[package]] name = "itoa" -version = "0.4.7" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] [[package]] name = "lazy_static" -version = "1.4.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.98" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libflate" @@ -189,106 +292,115 @@ dependencies = [ ] [[package]] -name = "log" -version = "0.4.14" +name = "lock_api" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ - "cfg-if", + "autocfg", + "scopeguard", ] +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + [[package]] name = "memchr" -version = "2.4.0" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] -name = "mio" -version = "0.7.13" +name = "miniz_oxide" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" dependencies = [ - "libc", - "log", - "miow", - "ntapi", - "winapi", + "adler", ] [[package]] -name = "miow" -version = "0.3.7" +name = "mio" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ - "winapi", + "hermit-abi", + "libc", + "wasi", + "windows-sys", ] [[package]] name = "new_debug_unreachable" -version = "1.0.4" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" [[package]] -name = "ntapi" -version = "0.3.6" +name = "num-traits" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ - "winapi", + "autocfg", ] [[package]] -name = "num-integer" -version = "0.1.44" +name = "object" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ - "autocfg", - "num-traits", + "memchr", ] [[package]] -name = "num-traits" -version = "0.2.14" +name = "once_cell" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" -dependencies = [ - "autocfg", -] +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] -name = "num_cpus" -version = "1.13.0" +name = "parking_lot" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ - "hermit-abi", - "libc", + "lock_api", + "parking_lot_core", ] [[package]] -name = "once_cell" -version = "1.8.0" +name = "parking_lot_core" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] [[package]] name = "phf_shared" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ "siphasher", ] [[package]] name = "pin-project-lite" -version = "0.2.7" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" [[package]] name = "pin-utils" @@ -302,59 +414,71 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c" -[[package]] -name = "proc-macro-hack" -version = "0.5.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" - -[[package]] -name = "proc-macro-nested" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" - [[package]] name = "proc-macro2" -version = "1.0.27" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d8caf72986c1a598726adc988bb5984792ef84f5ee5aa50209145ee8077038" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ - "unicode-xid", + "unicode-ident", ] [[package]] name = "quote" -version = "1.0.9" +version = "1.0.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +dependencies = [ + "bitflags", +] + [[package]] name = "rle-decode-fast" -version = "1.0.1" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "ryu" -version = "1.0.5" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.126" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" +dependencies = [ + "serde_derive", +] [[package]] name = "serde_derive" -version = "1.0.126" +version = "1.0.207" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" dependencies = [ "proc-macro2", "quote", @@ -363,35 +487,61 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.64" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" -version = "1.4.0" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" dependencies = [ "libc", ] [[package]] name = "siphasher" -version = "0.3.5" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbce6d4507c7e4a3962091436e56e95290cb71fa302d0d270e32130b75fbff27" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" [[package]] name = "slab" -version = "0.4.3" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys", +] [[package]] name = "squiflog" @@ -414,12 +564,13 @@ dependencies = [ [[package]] name = "string_cache" -version = "0.8.1" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ddb1139b5353f96e429e1a5e19fbaf663bddedaa06d1dbd49f82e352601209a" +checksum = "f91138e76242f575eb1d3b38b4f1362f10d3a43f47d182a5b359af488a02293b" dependencies = [ - "lazy_static", "new_debug_unreachable", + "once_cell", + "parking_lot", "phf_shared", "precomputed-hash", "serde", @@ -427,13 +578,13 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.73" +version = "2.0.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f71489ff30030d2ae598524f61326b902466f72a0fb1a8564c001cc63425bcc7" +checksum = "1fceb41e3d546d0bd83421d3409b1460cc7444cd389341a4c880fe7a042cb3d7" dependencies = [ "proc-macro2", "quote", - "unicode-xid", + "unicode-ident", ] [[package]] @@ -442,77 +593,179 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi", - "winapi", -] - [[package]] name = "tokio" -version = "1.8.1" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c8b05dc14c75ea83d63dd391100353789f5f24b8b3866542a5e85c8be8e985" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ - "autocfg", + "backtrace", "libc", "mio", - "num_cpus", - "once_cell", "pin-project-lite", "signal-hook-registry", - "winapi", + "socket2", + "windows-sys", ] [[package]] name = "tokio-util" -version = "0.6.7" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", "futures-sink", - "log", "pin-project-lite", "tokio", ] [[package]] -name = "unicode-xid" -version = "0.2.2" +name = "unicode-ident" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" +version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] -name = "winapi" -version = "0.3.9" +name = "wasm-bindgen" +version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", + "cfg-if", + "once_cell", + "wasm-bindgen-macro", ] [[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" [[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" +name = "windows_x86_64_msvc" +version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index 50ec5b4..e2f78f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ -[workspace] -members = ["squiflog"] - -[profile.release] -debug = true +[workspace] +members = ["squiflog"] + +[profile.release] +debug = true diff --git a/README.md b/README.md index b121ff4..9a572b9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# `squiflog` +# `Seq.Input.Syslog` and `datalust/seq-input-syslog` Ingest syslog [RFC 5424](https://tools.ietf.org/html/rfc5424) and [RFC 3164](https://tools.ietf.org/html/rfc3164) messages via UDP into [Seq](https://datalust.co/seq). @@ -75,4 +75,4 @@ $ docker run \ ``` In this case the `syslog-address` option needs to resolve to the running `seq-input-syslog` container. -**Important note:** providing the `--log-opt syslog-format=rfc5424` enables the stricter and more informative RFC 5424 Syslog format. Leaving this unset may default to the earlier RFC 3164 format. +**Important note:** providing the `--log-opt syslog-format=rfc5424` enables the stricter and more informative RFC 5424 syslog format. Leaving this unset may default to the earlier RFC 3164 format. diff --git a/Seq.Input.Syslog.nuspec b/Seq.Input.Syslog.nuspec index 53f24cf..878344f 100644 --- a/Seq.Input.Syslog.nuspec +++ b/Seq.Input.Syslog.nuspec @@ -14,7 +14,8 @@ - - + + + diff --git a/appveyor.yml b/appveyor.yml index 9b1c443..5e3efc0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -6,8 +6,8 @@ image: test: off environment: - CLI_VERSION: '5.1.274' - RUST_TOOLCHAIN: '1.53.0' + CLI_VERSION: '2024.3.873' + RUST_TOOLCHAIN: '1.80.0' DOCKER_TOKEN: secure: QKr2YEuliXdFKe3jN7w97w== DOCKER_USER: @@ -32,7 +32,7 @@ for: deploy: - provider: NuGet api_key: - secure: gY0WYVg5+Cgr83nsqbIeHkQa1icLD+MMsimgITGny0Ix3EWrE+ykWdGOEp/ByuYR + secure: kpqwuq5o4VPX6bcnNkpF8z2rUQ2P4z3A4GcHG21Z/Rz9hAlJ/Qfaf1479VE+hADW skip_symbols: true artifact: /Seq.Input.Syslog.*\.nupkg/ on: diff --git a/ci/build-deps.ps1 b/ci/build-deps.ps1 index b2d5b99..e5446d5 100644 --- a/ci/build-deps.ps1 +++ b/ci/build-deps.ps1 @@ -68,6 +68,7 @@ function Invoke-LinuxBuild Write-BeginStep $MYINVOCATION Run-Command -Exe cargo -ArgumentList 'build', '--bin squiflog', '--release', '--target x86_64-unknown-linux-musl' + Run-Command -Exe cargo -ArgumentList 'build', '--bin squiflog', '--release', '--target aarch64-unknown-linux-musl' } function Invoke-LinuxTests { diff --git a/ci/linux-x64/setup.sh b/ci/linux-x64/setup.sh index d9e85b5..2a08cc6 100755 --- a/ci/linux-x64/setup.sh +++ b/ci/linux-x64/setup.sh @@ -7,6 +7,7 @@ curl https://sh.rustup.rs -sSf | sh -s -- --default-host x86_64-unknown-linux-gn export PATH="$HOME/.cargo/bin:$PATH" rustup target add x86_64-unknown-linux-musl +rustup target add aarch64-unknown-linux-musl ls /home/appveyor ls /home/appveyor/.cargo diff --git a/ci/win-x64/setup.ps1 b/ci/win-x64/setup.ps1 index 6c5d165..a39dd18 100644 --- a/ci/win-x64/setup.ps1 +++ b/ci/win-x64/setup.ps1 @@ -12,4 +12,7 @@ $env:Path = "C:\Users\appveyor\.cargo\bin;$env:Path" & rustup target add x86_64-unknown-linux-musl if ($LASTEXITCODE) { exit 1 } +& rustup target add aarch64-unknown-linux-musl +if ($LASTEXITCODE) { exit 1 } + $ErrorActionPreference = "Stop" diff --git a/seq-input-syslog.d.json b/seq-input-syslog.d.json index 3d47f60..c187d51 100644 --- a/seq-input-syslog.d.json +++ b/seq-input-syslog.d.json @@ -9,6 +9,9 @@ }, "linux-x64": { "executable": "linux-x64/squiflog" + }, + "linux-arm64": { + "executable": "linux-arm64/squiflog" } }, "settings": { diff --git a/squiflog/Cargo.toml b/squiflog/Cargo.toml index 1c9d6a4..fcc24f4 100644 --- a/squiflog/Cargo.toml +++ b/squiflog/Cargo.toml @@ -23,12 +23,12 @@ libflate = "0.1" version = "0.3" [dependencies.pin-utils] -version = "0.1.0-alpha.4" +version = "0.1" [dependencies.tokio] version = "1" features = ["signal", "sync", "net", "time", "rt", "rt-multi-thread"] [dependencies.tokio-util] -version = "0.6" +version = "0.7" features = ["codec", "net"] diff --git a/squiflog/src/config.rs b/squiflog/src/config.rs index ce27102..99b5c0b 100644 --- a/squiflog/src/config.rs +++ b/squiflog/src/config.rs @@ -1,72 +1,80 @@ -use std::{env, str::FromStr}; - -use crate::{data, diagnostics, error::Error, server}; - -#[derive(Debug, Default, Clone)] -pub struct Config { - pub data: data::Config, - pub server: server::Config, - pub diagnostics: diagnostics::Config, -} - -impl Config { - pub fn from_env() -> Result { - let mut config = Config::default(); - let is_seq_app = is_seq_app(); - - let bind_address_var = if is_seq_app { - "SEQ_APP_SETTING_SYSLOGADDRESS" - } else { - "SYSLOG_ADDRESS" - }; - read_environment(&mut config.server.bind, bind_address_var)?; - - let enable_diagnostics = if is_seq_app { - "SEQ_APP_SETTING_ENABLEDIAGNOSTICS" - } else { - "SYSLOG_ENABLE_DIAGNOSTICS" - }; - if is_truthy(enable_diagnostics)? { - config.diagnostics.min_level = diagnostics::Level::Debug; - } - - Ok(config) - } -} - -pub fn is_seq_app() -> bool { - env::var("SEQ_APP_ID").is_ok() -} - -fn is_truthy(name: impl AsRef) -> Result { - match env::var(name.as_ref()) { - // The evironment variable contains a truthy value - Ok(ref v) if v == "True" || v == "true" => return Ok(true), - // The environment variable is not set or doesn't contain - // a truthy value - Ok(_) | Err(env::VarError::NotPresent) => return Ok(false), - // The environment variable is invalid - Err(e) => Err(e)?, - } -} - -fn read_environment(into: &mut T, name: impl AsRef) -> Result<(), Error> -where - T: FromStr, - Error: From, -{ - match env::var(name.as_ref()) { - // The environment variable exists, but is empty - Ok(ref v) if v == "" => return Ok(()), - // The environment variable does not exist - Err(env::VarError::NotPresent) => return Ok(()), - // The environment variable is invalid - Err(e) => Err(e)?, - // The environment variable has a value - Ok(v) => { - *into = T::from_str(&v)?; - - Ok(()) - } - } -} +use std::{ + env, + str::FromStr, +}; + +use crate::{ + data, + diagnostics, + error::Error, + server, +}; + +#[derive(Debug, Default, Clone)] +pub struct Config { + pub data: data::Config, + pub server: server::Config, + pub diagnostics: diagnostics::Config, +} + +impl Config { + pub fn from_env() -> Result { + let mut config = Config::default(); + let is_seq_app = is_seq_app(); + + let bind_address_var = if is_seq_app { + "SEQ_APP_SETTING_SYSLOGADDRESS" + } else { + "SYSLOG_ADDRESS" + }; + read_environment(&mut config.server.bind, bind_address_var)?; + + let enable_diagnostics = if is_seq_app { + "SEQ_APP_SETTING_ENABLEDIAGNOSTICS" + } else { + "SYSLOG_ENABLE_DIAGNOSTICS" + }; + if is_truthy(enable_diagnostics)? { + config.diagnostics.min_level = diagnostics::Level::Debug; + } + + Ok(config) + } +} + +pub fn is_seq_app() -> bool { + env::var("SEQ_APP_ID").is_ok() +} + +fn is_truthy(name: impl AsRef) -> Result { + match env::var(name.as_ref()) { + // The evironment variable contains a truthy value + Ok(ref v) if v == "True" || v == "true" => return Ok(true), + // The environment variable is not set or doesn't contain + // a truthy value + Ok(_) | Err(env::VarError::NotPresent) => return Ok(false), + // The environment variable is invalid + Err(e) => Err(e)?, + } +} + +fn read_environment(into: &mut T, name: impl AsRef) -> Result<(), Error> +where + T: FromStr, + Error: From, +{ + match env::var(name.as_ref()) { + // The environment variable exists, but is empty + Ok(ref v) if v == "" => return Ok(()), + // The environment variable does not exist + Err(env::VarError::NotPresent) => return Ok(()), + // The environment variable is invalid + Err(e) => Err(e)?, + // The environment variable has a value + Ok(v) => { + *into = T::from_str(&v)?; + + Ok(()) + } + } +} diff --git a/squiflog/src/data/clef.rs b/squiflog/src/data/clef.rs index 7181d66..b5c15c3 100644 --- a/squiflog/src/data/clef.rs +++ b/squiflog/src/data/clef.rs @@ -1,31 +1,37 @@ -use std::{borrow::Cow, collections::HashMap}; -use serde_json::Value; -use chrono::{DateTime, Utc}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Message<'a> { - #[serde(rename = "@t")] - pub timestamp: DateTime, - - #[serde(rename = "@l")] - pub level: Option<&'a str>, - - #[serde(rename = "@m")] - #[serde(skip_serializing_if = "Option::is_none")] - pub message: Option>, - - // @mt and @x are currently not used - #[serde(rename = "@mt")] - #[serde(skip_serializing_if = "Option::is_none")] - pub message_template: Option<&'a str>, - - #[serde(rename = "@x")] - #[serde(skip_serializing_if = "Option::is_none")] - pub exception: Option<&'a str>, - - // @i and @r are currently not implemented - - // Everything else - #[serde(flatten)] - pub additional: HashMap<&'a str, Value>, -} +use chrono::{ + DateTime, + Utc, +}; +use serde_json::Value; +use std::{ + borrow::Cow, + collections::HashMap, +}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Message<'a> { + #[serde(rename = "@t")] + pub timestamp: DateTime, + + #[serde(rename = "@l")] + pub level: Option<&'a str>, + + #[serde(rename = "@m")] + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option>, + + // @mt and @x are currently not used + #[serde(rename = "@mt")] + #[serde(skip_serializing_if = "Option::is_none")] + pub message_template: Option<&'a str>, + + #[serde(rename = "@x")] + #[serde(skip_serializing_if = "Option::is_none")] + pub exception: Option<&'a str>, + + // @i and @r are currently not implemented + + // Everything else + #[serde(flatten)] + pub additional: HashMap<&'a str, Value>, +} diff --git a/squiflog/src/data/mod.rs b/squiflog/src/data/mod.rs index f410c36..c667a6a 100644 --- a/squiflog/src/data/mod.rs +++ b/squiflog/src/data/mod.rs @@ -1,266 +1,266 @@ -use std::{ - collections::HashMap, - io, - str, - io::Write -}; - -use serde_json::{ - self, - json, -}; - -use crate::error::Error; -use chrono::Utc; - -mod clef; -mod parsers; -pub mod syslog; - -metrics! { - msg -} - -/** -Configuration for CLEF formatting. -*/ -#[derive(Debug, Clone)] -pub struct Config {} - -impl Default for Config { - fn default() -> Self { - Config {} - } -} - -/** -Build a CLEF processor to handle messages. -*/ -pub fn build(config: Config) -> Data { - Data::new(config) -} - -#[derive(Clone)] -pub struct Data {} - -impl Data { - pub fn new(_: Config) -> Self { - Data {} - } - - pub fn read_as_clef(&self, msg: &[u8]) -> Result<(), Error> { - increment!(data.msg); - let syslog = syslog::Message::from_bytes(msg); - let clef = syslog.into_clef(); - let stdout = io::stdout(); - let mut stdout = stdout.lock(); - - serde_json::to_writer(&mut stdout, &clef)?; - stdout.write_all(b"\n")?; - - Ok(()) - } -} - -impl<'a> syslog::Message<'a> { - /** - Covert a SYSLOG message into CLEF. - - The contents of the SYSLOG message is inspected and deserialized as CLEF-encoded - JSON if possible. In this case, timestamp, message, and level information from - the embedded CLEF is given precedence over the SYSLOG header. - - Other fields with conflicting names are prioritized: - - SYSLOG header > SYSLOG structured data > SYSLOG message embedded CLEF/JSON - - This means fields set by the system/on the logger are preferred over - the fields attached to any one event. - - If fields conflict, then the lower-priority field is included with a - double-underscore-prefixed name, e.g.: "__host". - */ - pub fn into_clef(self) -> clef::Message<'a> { - #![deny(unused_variables)] - - let syslog::Message { - priority, - timestamp, - hostname, - app_name, - proc_id, - message_id, - structured_data, - message, - } = self; - - let mut additional = HashMap::new(); - - additional.insert("facility", json!(priority.facility())); - if let Some(hostname) = hostname { - additional.insert("hostname", json!(hostname)); - } - if let Some(app_name) = app_name { - additional.insert("app_name", json!(app_name)); - } - if let Some(proc_id) = proc_id { - additional.insert("proc_id", json!(proc_id)); - } - if let Some(message_id) = message_id { - additional.insert("message_id", json!(message_id)); - } - - if let Some(sd) = structured_data { - for element in sd { - let mut params = vec![]; - for (k, v) in element.params { - let mut map = HashMap::new(); - map.insert(k, v); - params.push(map); - } - additional.insert(element.id, json!(params)); - } - } - - clef::Message { - timestamp: timestamp.unwrap_or_else(|| Utc::now()), - level: Some(priority.severity()), - message, - message_template: None, - exception: None, - additional, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use serde_json::{ - self, - json, - }; - use std::borrow::Cow::Borrowed; - use crate::test_util::to_timestamp; - - #[test] - fn syslog_to_clef() { - let expected = json!({ - "@l": "info", - "@m": "hello world", - "@t": "2020-02-13T00:51:39.527825Z", - "facility": "daemon", - "hostname": "docker-desktop", - "app_name": "8b1089798cf8", - "proc_id": "1481", - "message_id": "8b1089798cf8", - }); - - let message = "hello world"; - - let syslog = syslog::Message { - priority: syslog::Priority { - facility: 3, - severity: 6, - }, - timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), - hostname: Some("docker-desktop"), - app_name: Some("8b1089798cf8"), - proc_id: Some("1481"), - message_id: Some("8b1089798cf8"), - structured_data: None, - message: Some(Borrowed(message)), - }; - - let clef = syslog.into_clef(); - let actual = serde_json::to_value(clef).unwrap(); - - assert_eq!(expected, actual); - } - - #[test] - fn syslog_to_clef_with_structured_data() { - let expected = json!({ - "@l": "info", - "@m": "hello world", - "@t": "2020-02-13T00:51:39.527825Z", - "facility": "daemon", - "hostname": "docker-desktop", - "app_name": "8b1089798cf8", - "proc_id": "1481", - "message_id": "8b1089798cf8", - "sdid1234": [{ "hello": "world" }, { "event": "value" }] - }); - - let message = "hello world"; - - let mut sd_params = vec![]; - sd_params.push(("hello", "world".to_owned())); - sd_params.push(("event", "value".to_owned())); - - let syslog = syslog::Message { - priority: syslog::Priority { - facility: 3, - severity: 6, - }, - timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), - hostname: Some("docker-desktop"), - app_name: Some("8b1089798cf8"), - proc_id: Some("1481"), - message_id: Some("8b1089798cf8"), - structured_data: Some(vec![syslog::StructuredDataElement { - id: "sdid1234", - params: sd_params, - }]), - message: Some(Borrowed(message)), - }; - - let clef = syslog.into_clef(); - let actual = serde_json::to_value(clef).unwrap(); - - assert_eq!(expected, actual); - } - - #[test] - fn syslog_to_clef_with_structured_data_with_duplicated_params() { - let expected = json!({ - "@l": "info", - "@m": "hello world", - "@t": "2020-02-13T00:51:39.527825Z", - "facility": "daemon", - "hostname": "docker-desktop", - "app_name": "8b1089798cf8", - "proc_id": "1481", - "message_id": "8b1089798cf8", - "sdid1234": [{ "ip": "192.0.2.1" }, { "ip": "192.0.2.129" }] - }); - - let message = "hello world"; - - let mut sd_params = vec![]; - sd_params.push(("ip", "192.0.2.1".to_owned())); - sd_params.push(("ip", "192.0.2.129".to_owned())); - - let syslog = syslog::Message { - priority: syslog::Priority { - facility: 3, - severity: 6, - }, - timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), - hostname: Some("docker-desktop"), - app_name: Some("8b1089798cf8"), - proc_id: Some("1481"), - message_id: Some("8b1089798cf8"), - structured_data: Some(vec![syslog::StructuredDataElement { - id: "sdid1234", - params: sd_params, - }]), - message: Some(Borrowed(message)), - }; - - let clef = syslog.into_clef(); - let actual = serde_json::to_value(clef).unwrap(); - - assert_eq!(expected, actual); - } -} +use std::{ + collections::HashMap, + io, + io::Write, + str, +}; + +use serde_json::{ + self, + json, +}; + +use crate::error::Error; +use chrono::Utc; + +mod clef; +mod parsers; +pub mod syslog; + +metrics! { + msg +} + +/** +Configuration for CLEF formatting. +*/ +#[derive(Debug, Clone)] +pub struct Config {} + +impl Default for Config { + fn default() -> Self { + Config {} + } +} + +/** +Build a CLEF processor to handle messages. +*/ +pub fn build(config: Config) -> Data { + Data::new(config) +} + +#[derive(Clone)] +pub struct Data {} + +impl Data { + pub fn new(_: Config) -> Self { + Data {} + } + + pub fn read_as_clef(&self, msg: &[u8]) -> Result<(), Error> { + increment!(data.msg); + let syslog = syslog::Message::from_bytes(msg); + let clef = syslog.into_clef(); + let stdout = io::stdout(); + let mut stdout = stdout.lock(); + + serde_json::to_writer(&mut stdout, &clef)?; + stdout.write_all(b"\n")?; + + Ok(()) + } +} + +impl<'a> syslog::Message<'a> { + /** + Covert a SYSLOG message into CLEF. + + The contents of the SYSLOG message is inspected and deserialized as CLEF-encoded + JSON if possible. In this case, timestamp, message, and level information from + the embedded CLEF is given precedence over the SYSLOG header. + + Other fields with conflicting names are prioritized: + + SYSLOG header > SYSLOG structured data > SYSLOG message embedded CLEF/JSON + + This means fields set by the system/on the logger are preferred over + the fields attached to any one event. + + If fields conflict, then the lower-priority field is included with a + double-underscore-prefixed name, e.g.: "__host". + */ + pub fn into_clef(self) -> clef::Message<'a> { + #![deny(unused_variables)] + + let syslog::Message { + priority, + timestamp, + hostname, + app_name, + proc_id, + message_id, + structured_data, + message, + } = self; + + let mut additional = HashMap::new(); + + additional.insert("facility", json!(priority.facility())); + if let Some(hostname) = hostname { + additional.insert("hostname", json!(hostname)); + } + if let Some(app_name) = app_name { + additional.insert("app_name", json!(app_name)); + } + if let Some(proc_id) = proc_id { + additional.insert("proc_id", json!(proc_id)); + } + if let Some(message_id) = message_id { + additional.insert("message_id", json!(message_id)); + } + + if let Some(sd) = structured_data { + for element in sd { + let mut params = vec![]; + for (k, v) in element.params { + let mut map = HashMap::new(); + map.insert(k, v); + params.push(map); + } + additional.insert(element.id, json!(params)); + } + } + + clef::Message { + timestamp: timestamp.unwrap_or_else(|| Utc::now()), + level: Some(priority.severity()), + message, + message_template: None, + exception: None, + additional, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::test_util::to_timestamp; + use serde_json::{ + self, + json, + }; + use std::borrow::Cow::Borrowed; + + #[test] + fn syslog_to_clef() { + let expected = json!({ + "@l": "info", + "@m": "hello world", + "@t": "2020-02-13T00:51:39.527825Z", + "facility": "daemon", + "hostname": "docker-desktop", + "app_name": "8b1089798cf8", + "proc_id": "1481", + "message_id": "8b1089798cf8", + }); + + let message = "hello world"; + + let syslog = syslog::Message { + priority: syslog::Priority { + facility: 3, + severity: 6, + }, + timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), + hostname: Some("docker-desktop"), + app_name: Some("8b1089798cf8"), + proc_id: Some("1481"), + message_id: Some("8b1089798cf8"), + structured_data: None, + message: Some(Borrowed(message)), + }; + + let clef = syslog.into_clef(); + let actual = serde_json::to_value(clef).unwrap(); + + assert_eq!(expected, actual); + } + + #[test] + fn syslog_to_clef_with_structured_data() { + let expected = json!({ + "@l": "info", + "@m": "hello world", + "@t": "2020-02-13T00:51:39.527825Z", + "facility": "daemon", + "hostname": "docker-desktop", + "app_name": "8b1089798cf8", + "proc_id": "1481", + "message_id": "8b1089798cf8", + "sdid1234": [{ "hello": "world" }, { "event": "value" }] + }); + + let message = "hello world"; + + let mut sd_params = vec![]; + sd_params.push(("hello", "world".to_owned())); + sd_params.push(("event", "value".to_owned())); + + let syslog = syslog::Message { + priority: syslog::Priority { + facility: 3, + severity: 6, + }, + timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), + hostname: Some("docker-desktop"), + app_name: Some("8b1089798cf8"), + proc_id: Some("1481"), + message_id: Some("8b1089798cf8"), + structured_data: Some(vec![syslog::StructuredDataElement { + id: "sdid1234", + params: sd_params, + }]), + message: Some(Borrowed(message)), + }; + + let clef = syslog.into_clef(); + let actual = serde_json::to_value(clef).unwrap(); + + assert_eq!(expected, actual); + } + + #[test] + fn syslog_to_clef_with_structured_data_with_duplicated_params() { + let expected = json!({ + "@l": "info", + "@m": "hello world", + "@t": "2020-02-13T00:51:39.527825Z", + "facility": "daemon", + "hostname": "docker-desktop", + "app_name": "8b1089798cf8", + "proc_id": "1481", + "message_id": "8b1089798cf8", + "sdid1234": [{ "ip": "192.0.2.1" }, { "ip": "192.0.2.129" }] + }); + + let message = "hello world"; + + let mut sd_params = vec![]; + sd_params.push(("ip", "192.0.2.1".to_owned())); + sd_params.push(("ip", "192.0.2.129".to_owned())); + + let syslog = syslog::Message { + priority: syslog::Priority { + facility: 3, + severity: 6, + }, + timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), + hostname: Some("docker-desktop"), + app_name: Some("8b1089798cf8"), + proc_id: Some("1481"), + message_id: Some("8b1089798cf8"), + structured_data: Some(vec![syslog::StructuredDataElement { + id: "sdid1234", + params: sd_params, + }]), + message: Some(Borrowed(message)), + }; + + let clef = syslog.into_clef(); + let actual = serde_json::to_value(clef).unwrap(); + + assert_eq!(expected, actual); + } +} diff --git a/squiflog/src/data/parsers.rs b/squiflog/src/data/parsers.rs index 8bb87c7..1700ab4 100644 --- a/squiflog/src/data/parsers.rs +++ b/squiflog/src/data/parsers.rs @@ -1,6 +1,17 @@ -use crate::error::{Error, err_msg}; -use chrono::{Utc, DateTime, Local, Datelike, Timelike, TimeZone}; use crate::data::syslog::StructuredDataElement; +use crate::error::{ + err_msg, + Error, +}; +use chrono::{ + DateTime, + Datelike, + Local, + NaiveDateTime, + TimeZone, + Timelike, + Utc, +}; type ParserResult<'a, T> = Result<(T, &'a [u8]), Error>; @@ -78,7 +89,10 @@ pub fn iso8601_timestamp(i: &[u8]) -> ParserResult> { Ok((utc, rem)) } -pub fn loose_timestamp<'a, 'b>(i: &'a [u8], now: &'b DateTime) -> ParserResult<'a, DateTime> { +pub fn loose_timestamp<'a, 'b>( + i: &'a [u8], + now: &'b DateTime, +) -> ParserResult<'a, DateTime> { if let Ok((iso_ts, rem)) = iso8601_timestamp(i) { return Ok((iso_ts, rem)); } @@ -86,18 +100,28 @@ pub fn loose_timestamp<'a, 'b>(i: &'a [u8], now: &'b DateTime) -> ParserRes let (month_day_h_m_s, rem) = take(i, 15)?; let cheat_and_allocate_a_year = std::str::from_utf8(month_day_h_m_s)?.to_string() + " 1980"; - let local = Local.datetime_from_str(&cheat_and_allocate_a_year, "%h %d %H:%M:%S %Y")?; + let local = NaiveDateTime::parse_from_str(&cheat_and_allocate_a_year, "%h %d %H:%M:%S %Y")? + .and_local_timezone(Local) + .unwrap(); let year_offset = if &month_day_h_m_s[0..3] == &b"Dec"[..] && now.month() == 1 { - - 1 + -1 } else if &month_day_h_m_s[0..3] == &b"Jan"[..] && now.month() == 12 { 1 } else { 0 }; - let with_year = Local.ymd(now.year() + year_offset, local.month(), local.day()) - .and_hms(local.hour(), local.minute(), local.second()); + let with_year = Local + .with_ymd_and_hms( + now.year() + year_offset, + local.month(), + local.day(), + local.hour(), + local.minute(), + local.second(), + ) + .unwrap(); let utc = with_year.with_timezone(&Utc); Ok((utc, rem)) @@ -142,7 +166,7 @@ pub fn structured_data_element(i: &[u8]) -> ParserResult } let (_, rem) = byte(rem, b']')?; - Ok((StructuredDataElement{id, params}, rem)) + Ok((StructuredDataElement { id, params }, rem)) } pub fn param_value_content(i: &[u8]) -> ParserResult { @@ -205,10 +229,18 @@ mod tests { #[test] fn delimited_rejects_invalid_content() { - let cases = [&b"(test"[..], &b"test)"[..], &b" "[..], &b""[..], &b"("[..], &b")"[..]].to_vec(); + let cases = [ + &b"(test"[..], + &b"test)"[..], + &b" "[..], + &b""[..], + &b"("[..], + &b")"[..], + ] + .to_vec(); for case in cases { let expect_err = delimited(case, b'(', b')'); - assert!(expect_err.is_err(), case); + assert!(expect_err.is_err()); } } diff --git a/squiflog/src/data/syslog.rs b/squiflog/src/data/syslog.rs index d2da169..beea25d 100644 --- a/squiflog/src/data/syslog.rs +++ b/squiflog/src/data/syslog.rs @@ -1,474 +1,487 @@ -use crate::{ - error::{ - err_msg, - Error, - }, - data::parsers -}; -use std::borrow::Cow; -use chrono::{Utc, DateTime}; - -#[derive(Debug, Eq, PartialEq)] -pub struct Priority { - pub facility: u8, - pub severity: u8, -} - -impl Priority { - fn from_raw(raw: u8) -> Self { - let facility = raw / 8; - let severity = raw % 8; - - Priority { facility, severity } - } - - pub fn severity(&self) -> &'static str { - match self.severity { - 0 => "emerg", - 1 => "alert", - 2 => "crit", - 3 => "err", - 4 => "warning", - 5 => "notice", - 6 => "info", - _ => "debug", - } - } - - pub fn facility(&self) -> &'static str { - match self.facility { - 0 => "kern", - 1 => "user", - 2 => "mail", - 3 => "daemon", - 4 => "auth", - 5 => "syslog", - 6 => "lpr", - 7 => "news", - 8 => "uucp", - 9 => "cron", - 10 => "authpriv", - 11 => "ftp", - 12 => "ntp", - 13 => "security", - 14 => "console", - 15 => "solaris-cron", - 16 => "local0", - 17 => "local1", - 18 => "local2", - 19 => "local3", - 20 => "local4", - 21 => "local5", - 22 => "local6", - 23 => "local7", - _ => "unknown", - } - } -} - -#[derive(Debug, Eq, PartialEq)] -pub struct StructuredDataElement<'a> { - pub id: &'a str, - pub params: Vec<(&'a str, String)>, -} - -#[derive(Debug, Eq, PartialEq)] -pub struct Message<'a> { - pub priority: Priority, - pub timestamp: Option>, - pub hostname: Option<&'a str>, - pub app_name: Option<&'a str>, - pub proc_id: Option<&'a str>, - pub message_id: Option<&'a str>, - pub structured_data: Option>>, - pub message: Option>, -} - -impl<'a> Message<'a> { - pub fn from_str(s: &'a str) -> Self { - Self::from_bytes(s.as_bytes()) - } - - pub fn from_bytes(s: &'a [u8]) -> Self { - Self::from_rfc5424_bytes(s).unwrap_or_else(|_| Self::from_rfc3164_bytes(s, &Utc::now())) - } - - // RFC3164 format: TIMESTAMP HOSTNAME TAG: (MSG) - // We treat the tag as part of the message. - pub fn from_rfc3164_bytes(msg: &'a [u8], now: &DateTime) -> Self { - let mut unparsed = msg; - let mut result = Message { - priority: Priority::from_raw(13), - timestamp: None, - hostname: None, - app_name: None, - proc_id: None, - message_id: None, - structured_data: None, - message: None, - }; - - if let Ok((priority, rem)) = parsers::priority(unparsed) { - result.priority = Priority::from_raw(priority); - unparsed = rem; - - if let Ok((timestamp, rem)) = parsers::loose_timestamp(unparsed, now) { - result.timestamp = Some(timestamp); - unparsed = rem; - - if let Ok((_, rem)) = parsers::byte(unparsed, b' ') { - unparsed = rem; - - if let Ok((hostname, rem)) = parsers::header_item(unparsed, "hostname") { - result.hostname = hostname; - unparsed = rem; - } - } - } - } - - result.message = if unparsed.len() > 0 { Some(String::from_utf8_lossy(unparsed)) } else { None }; - - if result.timestamp.is_none() { - result.timestamp = Some(now.clone()) - } - - result - } - - // RFC5424 format: VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID STRUCTURED-DATA (MSG) - pub fn from_rfc5424_bytes(msg: &'a [u8]) -> Result { - let (priority, rem) = parsers::priority(msg)?; - - let mut result = Message { - priority: Priority::from_raw(priority), - timestamp: None, - hostname: None, - app_name: None, - proc_id: None, - message_id: None, - structured_data: None, - message: None, - }; - - let (version_item, rem) = parsers::header_item(rem, "version")?; - match version_item { - Some("1") => (), - _ => return Err(err_msg("invalid message, version not 1")) - }; - - let ts_rem; - let ts_attempt = parsers::iso8601_timestamp(rem); - if let Ok((timestamp, rem)) = ts_attempt { - result.timestamp = Some(timestamp); - ts_rem = rem; - } else { - let err = ts_attempt.unwrap_err(); - let (_, nil_rem) = parsers::byte(rem, b'-').map_err(move |_| err)?; - ts_rem = nil_rem; - } - - let (_, rem) = parsers::byte(ts_rem, b' ')?; - - let (hostname, rem) = parsers::header_item(rem, "hostname")?; - result.hostname = hostname; - - let (app_name, rem) = parsers::header_item(rem, "app_name")?; - result.app_name = app_name; - - let (proc_id, rem) = parsers::header_item(rem, "proc_id")?; - result.proc_id = proc_id; - - let (message_id, mut rem) = parsers::header_item(rem, "message_id")?; - result.message_id = message_id; - - let mut maybe_sd = parsers::structured_data_element(rem); - if maybe_sd.is_ok() { - while let Ok((sde, sd_rem)) = maybe_sd { - match result.structured_data { - None => result.structured_data = Some(vec![sde]), - Some(ref mut sd) => sd.push(sde) - } - rem = sd_rem; - maybe_sd = parsers::structured_data_element(rem); - } - } else { - let (_, sd_rem) = parsers::byte(rem, b'-')?; - rem = sd_rem; - } - - if let Ok((_, rem)) = parsers::byte(rem, b' ') { - let mut is_utf8 = false; - let mut message_bytes = rem; - if message_bytes.len() >= 3 && &message_bytes[..3] == b"\xEF\xBB\xBF" { - message_bytes = &message_bytes[3..]; - is_utf8 = true; - } - - result.message = if is_utf8 { - let trimmed = std::str::from_utf8(message_bytes)?.trim(); - if trimmed.len() > 0 { - Some(Cow::Borrowed(trimmed)) - } else { - None - } - } else { - let owned = String::from_utf8_lossy(message_bytes); - let trimmed = owned.trim(); - if trimmed.len() > 0 { - Some(Cow::Owned(trimmed.to_owned())) - } else { - None - } - }; - } - - Ok(result) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use chrono::{Datelike, TimeZone}; - use crate::test_util::to_timestamp; - use std::borrow::Cow::Borrowed; - - impl<'a> StructuredDataElement<'a> { - fn from_str(s: &'a str) -> Result { - let (r, rem) = parsers::structured_data_element(s.as_bytes())?; - if rem.len() > 0 { - Err(err_msg("too much input")) - } else { - Ok(r) - } - } - } - - #[test] - fn parse_rfc5424_syslog_message() { - // from docker alpine - let input = b"<30>1 2020-02-13T00:51:39.527825Z docker-desktop 8b1089798cf8 1481 8b1089798cf8 - hello world\n"; - - let expected = Message { - priority: Priority { - facility: 3, - severity: 6, - }, - timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), - hostname: Some("docker-desktop"), - app_name: Some("8b1089798cf8"), - proc_id: Some("1481"), - message_id: Some("8b1089798cf8"), - structured_data: None, - message: Some(Borrowed("hello world")), - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc5424_syslog_requires_hostname() { - let input = b"<30>1 2020-02-13T00:51:39Z "; - - let actual = Message::from_rfc5424_bytes(input); - - assert_eq!("missing hostname", actual.unwrap_err().to_string()); - } - - #[test] - fn parse_rfc5424_syslog_specs_example_1() { - // example 1 from https://tools.ietf.org/html/rfc5424 - let input = b"<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - \xEF\xBB\xBF\xE2\x80\x99su root\xE2\x80\x99 failed for lonvick on /dev/pts/8\n"; - - let expected = Message { - priority: Priority { - facility: 4, - severity: 2, - }, - timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), - hostname: Some("mymachine.example.com"), - app_name: Some("su"), - proc_id: None, - message_id: Some("ID47"), - structured_data: None, - message: Some(Borrowed("’su root’ failed for lonvick on /dev/pts/8")), - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc5424_syslog_specs_example_2() { - // example 2 from https://tools.ietf.org/html/rfc5424 - let input = b"<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts.\n"; - - let expected = Message { - priority: Priority { - facility: 20, - severity: 5, - }, - timestamp: to_timestamp("2003-08-24T05:14:15.000003-07:00"), - hostname: Some("192.0.2.1"), - app_name: Some("myproc"), - proc_id: Some("8710"), - message_id: None, - structured_data: None, - message: Some(Borrowed("%% It's time to make the do-nuts.")), - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse message"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc5424_syslog_specs_example_3() { - // example 3 from https://tools.ietf.org/html/rfc5424 - let input = b"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] \xEF\xBB\xBFAn application event log entry...\n"; - - let mut sd_params = vec![]; - sd_params.push(("iut", "3".to_owned())); - sd_params.push(("eventSource", "Application".to_owned())); - sd_params.push(("eventID", "1011".to_owned())); - - let expected = Message { - priority: Priority { - facility: 20, - severity: 5, - }, - timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), - hostname: Some("mymachine.example.com"), - app_name: Some("evntslog"), - proc_id: None, - message_id: Some("ID47"), - structured_data: Some(vec![StructuredDataElement { - id: "exampleSDID@32473", - params: sd_params, - }]), - message: Some(Borrowed("An application event log entry...")), - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc5424_syslog_specs_example_4() { - // example 4 from https://tools.ietf.org/html/rfc5424 - - let input = b"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"][examplePriority@32473 class=\"high\"]"; - - let mut sd_params = vec![]; - sd_params.push(("iut", "3".to_owned())); - sd_params.push(("eventSource", "Application".to_owned())); - sd_params.push(("eventID", "1011".to_owned())); - - let mut sd_params2 = vec![]; - sd_params2.push(("class", "high".to_owned())); - - let sd = vec![ - StructuredDataElement { - id: "exampleSDID@32473", - params: sd_params, - }, - StructuredDataElement { - id: "examplePriority@32473", - params: sd_params2, - }, - ]; - - let expected = Message { - priority: Priority { - facility: 20, - severity: 5, - }, - timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), - hostname: Some("mymachine.example.com"), - app_name: Some("evntslog"), - proc_id: None, - message_id: Some("ID47"), - structured_data: Some(sd), - message: None, - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc5424_empty_valid_syslog() { - let input = b"<0>1 - - - - - -"; - - let expected = Message { - priority: Priority { - facility: 0, - severity: 0, - }, - timestamp: None, - hostname: None, - app_name: None, - proc_id: None, - message_id: None, - structured_data: None, - message: None, - }; - - let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); - - assert_eq!(expected, actual); - } - - #[test] - fn structured_data_param_from_string() { - let input = "[exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"; - - let mut sd_params = vec![]; - sd_params.push(("iut", "3".to_owned())); - sd_params.push(("eventSource", "Application".to_owned())); - sd_params.push(("eventID", "1011".to_owned())); - - let expected = StructuredDataElement { - id: "exampleSDID@32473", - params: sd_params, - }; - - let actual = StructuredDataElement::from_str(input) - .expect("could not parse input for structured data element"); - - assert_eq!(expected, actual); - } - - #[test] - fn parse_rfc3164_example_2() { - let input = b"<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"; - - let now = Utc.ymd(2020, 10, 11).and_hms(0, 0, 0); - let msg = Message::from_rfc3164_bytes(input, &now); - - assert_eq!(msg.priority.facility, 4); - assert_eq!(msg.priority.severity, 2); - assert_eq!(msg.timestamp.unwrap().month(), 10); // Rest depends on local timezone ":-) - assert_eq!(msg.hostname, Some("mymachine")); - - // The 'tag' remains in the message; although we could extract 'su' as the tag, adherence to - // this format seems very patchy, and we're more likely to end up breaking messages that - // happen to include `:` by mistake. - assert_eq!(msg.message, Some(Borrowed("su: 'su root' failed for lonvick on /dev/pts/8"))); - } - - #[test] - fn parse_rfc3164_example_1() { - let input = b"Use the BFG!"; - - let msg = Message::from_rfc3164_bytes(input, &Utc::now()); - - assert_eq!("Use the BFG!", msg.message.unwrap()); - } -} +use crate::{ + data::parsers, + error::{ + err_msg, + Error, + }, +}; +use chrono::{ + DateTime, + Utc, +}; +use std::borrow::Cow; + +#[derive(Debug, Eq, PartialEq)] +pub struct Priority { + pub facility: u8, + pub severity: u8, +} + +impl Priority { + fn from_raw(raw: u8) -> Self { + let facility = raw / 8; + let severity = raw % 8; + + Priority { facility, severity } + } + + pub fn severity(&self) -> &'static str { + match self.severity { + 0 => "emerg", + 1 => "alert", + 2 => "crit", + 3 => "err", + 4 => "warning", + 5 => "notice", + 6 => "info", + _ => "debug", + } + } + + pub fn facility(&self) -> &'static str { + match self.facility { + 0 => "kern", + 1 => "user", + 2 => "mail", + 3 => "daemon", + 4 => "auth", + 5 => "syslog", + 6 => "lpr", + 7 => "news", + 8 => "uucp", + 9 => "cron", + 10 => "authpriv", + 11 => "ftp", + 12 => "ntp", + 13 => "security", + 14 => "console", + 15 => "solaris-cron", + 16 => "local0", + 17 => "local1", + 18 => "local2", + 19 => "local3", + 20 => "local4", + 21 => "local5", + 22 => "local6", + 23 => "local7", + _ => "unknown", + } + } +} + +#[derive(Debug, Eq, PartialEq)] +pub struct StructuredDataElement<'a> { + pub id: &'a str, + pub params: Vec<(&'a str, String)>, +} + +#[derive(Debug, Eq, PartialEq)] +pub struct Message<'a> { + pub priority: Priority, + pub timestamp: Option>, + pub hostname: Option<&'a str>, + pub app_name: Option<&'a str>, + pub proc_id: Option<&'a str>, + pub message_id: Option<&'a str>, + pub structured_data: Option>>, + pub message: Option>, +} + +impl<'a> Message<'a> { + pub fn from_str(s: &'a str) -> Self { + Self::from_bytes(s.as_bytes()) + } + + pub fn from_bytes(s: &'a [u8]) -> Self { + Self::from_rfc5424_bytes(s).unwrap_or_else(|_| Self::from_rfc3164_bytes(s, &Utc::now())) + } + + // RFC3164 format: TIMESTAMP HOSTNAME TAG: (MSG) + // We treat the tag as part of the message. + pub fn from_rfc3164_bytes(msg: &'a [u8], now: &DateTime) -> Self { + let mut unparsed = msg; + let mut result = Message { + priority: Priority::from_raw(13), + timestamp: None, + hostname: None, + app_name: None, + proc_id: None, + message_id: None, + structured_data: None, + message: None, + }; + + if let Ok((priority, rem)) = parsers::priority(unparsed) { + result.priority = Priority::from_raw(priority); + unparsed = rem; + + if let Ok((timestamp, rem)) = parsers::loose_timestamp(unparsed, now) { + result.timestamp = Some(timestamp); + unparsed = rem; + + if let Ok((_, rem)) = parsers::byte(unparsed, b' ') { + unparsed = rem; + + if let Ok((hostname, rem)) = parsers::header_item(unparsed, "hostname") { + result.hostname = hostname; + unparsed = rem; + } + } + } + } + + result.message = if unparsed.len() > 0 { + Some(String::from_utf8_lossy(unparsed)) + } else { + None + }; + + if result.timestamp.is_none() { + result.timestamp = Some(now.clone()) + } + + result + } + + // RFC5424 format: VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID STRUCTURED-DATA (MSG) + pub fn from_rfc5424_bytes(msg: &'a [u8]) -> Result { + let (priority, rem) = parsers::priority(msg)?; + + let mut result = Message { + priority: Priority::from_raw(priority), + timestamp: None, + hostname: None, + app_name: None, + proc_id: None, + message_id: None, + structured_data: None, + message: None, + }; + + let (version_item, rem) = parsers::header_item(rem, "version")?; + match version_item { + Some("1") => (), + _ => return Err(err_msg("invalid message, version not 1")), + }; + + let ts_rem; + let ts_attempt = parsers::iso8601_timestamp(rem); + if let Ok((timestamp, rem)) = ts_attempt { + result.timestamp = Some(timestamp); + ts_rem = rem; + } else { + let err = ts_attempt.unwrap_err(); + let (_, nil_rem) = parsers::byte(rem, b'-').map_err(move |_| err)?; + ts_rem = nil_rem; + } + + let (_, rem) = parsers::byte(ts_rem, b' ')?; + + let (hostname, rem) = parsers::header_item(rem, "hostname")?; + result.hostname = hostname; + + let (app_name, rem) = parsers::header_item(rem, "app_name")?; + result.app_name = app_name; + + let (proc_id, rem) = parsers::header_item(rem, "proc_id")?; + result.proc_id = proc_id; + + let (message_id, mut rem) = parsers::header_item(rem, "message_id")?; + result.message_id = message_id; + + let mut maybe_sd = parsers::structured_data_element(rem); + if maybe_sd.is_ok() { + while let Ok((sde, sd_rem)) = maybe_sd { + match result.structured_data { + None => result.structured_data = Some(vec![sde]), + Some(ref mut sd) => sd.push(sde), + } + rem = sd_rem; + maybe_sd = parsers::structured_data_element(rem); + } + } else { + let (_, sd_rem) = parsers::byte(rem, b'-')?; + rem = sd_rem; + } + + if let Ok((_, rem)) = parsers::byte(rem, b' ') { + let mut is_utf8 = false; + let mut message_bytes = rem; + if message_bytes.len() >= 3 && &message_bytes[..3] == b"\xEF\xBB\xBF" { + message_bytes = &message_bytes[3..]; + is_utf8 = true; + } + + result.message = if is_utf8 { + let trimmed = std::str::from_utf8(message_bytes)?.trim(); + if trimmed.len() > 0 { + Some(Cow::Borrowed(trimmed)) + } else { + None + } + } else { + let owned = String::from_utf8_lossy(message_bytes); + let trimmed = owned.trim(); + if trimmed.len() > 0 { + Some(Cow::Owned(trimmed.to_owned())) + } else { + None + } + }; + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::to_timestamp; + use chrono::{ + Datelike, + TimeZone, + }; + use std::borrow::Cow::Borrowed; + + impl<'a> StructuredDataElement<'a> { + fn from_str(s: &'a str) -> Result { + let (r, rem) = parsers::structured_data_element(s.as_bytes())?; + if rem.len() > 0 { + Err(err_msg("too much input")) + } else { + Ok(r) + } + } + } + + #[test] + fn parse_rfc5424_syslog_message() { + // from docker alpine + let input = b"<30>1 2020-02-13T00:51:39.527825Z docker-desktop 8b1089798cf8 1481 8b1089798cf8 - hello world\n"; + + let expected = Message { + priority: Priority { + facility: 3, + severity: 6, + }, + timestamp: to_timestamp("2020-02-13T00:51:39.527825Z"), + hostname: Some("docker-desktop"), + app_name: Some("8b1089798cf8"), + proc_id: Some("1481"), + message_id: Some("8b1089798cf8"), + structured_data: None, + message: Some(Borrowed("hello world")), + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc5424_syslog_requires_hostname() { + let input = b"<30>1 2020-02-13T00:51:39Z "; + + let actual = Message::from_rfc5424_bytes(input); + + assert_eq!("missing hostname", actual.unwrap_err().to_string()); + } + + #[test] + fn parse_rfc5424_syslog_specs_example_1() { + // example 1 from https://tools.ietf.org/html/rfc5424 + let input = b"<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 - \xEF\xBB\xBF\xE2\x80\x99su root\xE2\x80\x99 failed for lonvick on /dev/pts/8\n"; + + let expected = Message { + priority: Priority { + facility: 4, + severity: 2, + }, + timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), + hostname: Some("mymachine.example.com"), + app_name: Some("su"), + proc_id: None, + message_id: Some("ID47"), + structured_data: None, + message: Some(Borrowed("’su root’ failed for lonvick on /dev/pts/8")), + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc5424_syslog_specs_example_2() { + // example 2 from https://tools.ietf.org/html/rfc5424 + let input = b"<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - - %% It's time to make the do-nuts.\n"; + + let expected = Message { + priority: Priority { + facility: 20, + severity: 5, + }, + timestamp: to_timestamp("2003-08-24T05:14:15.000003-07:00"), + hostname: Some("192.0.2.1"), + app_name: Some("myproc"), + proc_id: Some("8710"), + message_id: None, + structured_data: None, + message: Some(Borrowed("%% It's time to make the do-nuts.")), + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse message"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc5424_syslog_specs_example_3() { + // example 3 from https://tools.ietf.org/html/rfc5424 + let input = b"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"] \xEF\xBB\xBFAn application event log entry...\n"; + + let mut sd_params = vec![]; + sd_params.push(("iut", "3".to_owned())); + sd_params.push(("eventSource", "Application".to_owned())); + sd_params.push(("eventID", "1011".to_owned())); + + let expected = Message { + priority: Priority { + facility: 20, + severity: 5, + }, + timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), + hostname: Some("mymachine.example.com"), + app_name: Some("evntslog"), + proc_id: None, + message_id: Some("ID47"), + structured_data: Some(vec![StructuredDataElement { + id: "exampleSDID@32473", + params: sd_params, + }]), + message: Some(Borrowed("An application event log entry...")), + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc5424_syslog_specs_example_4() { + // example 4 from https://tools.ietf.org/html/rfc5424 + + let input = b"<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"][examplePriority@32473 class=\"high\"]"; + + let mut sd_params = vec![]; + sd_params.push(("iut", "3".to_owned())); + sd_params.push(("eventSource", "Application".to_owned())); + sd_params.push(("eventID", "1011".to_owned())); + + let mut sd_params2 = vec![]; + sd_params2.push(("class", "high".to_owned())); + + let sd = vec![ + StructuredDataElement { + id: "exampleSDID@32473", + params: sd_params, + }, + StructuredDataElement { + id: "examplePriority@32473", + params: sd_params2, + }, + ]; + + let expected = Message { + priority: Priority { + facility: 20, + severity: 5, + }, + timestamp: to_timestamp("2003-10-11T22:14:15.003Z"), + hostname: Some("mymachine.example.com"), + app_name: Some("evntslog"), + proc_id: None, + message_id: Some("ID47"), + structured_data: Some(sd), + message: None, + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc5424_empty_valid_syslog() { + let input = b"<0>1 - - - - - -"; + + let expected = Message { + priority: Priority { + facility: 0, + severity: 0, + }, + timestamp: None, + hostname: None, + app_name: None, + proc_id: None, + message_id: None, + structured_data: None, + message: None, + }; + + let actual = Message::from_rfc5424_bytes(input).expect("could not parse input for syslog"); + + assert_eq!(expected, actual); + } + + #[test] + fn structured_data_param_from_string() { + let input = "[exampleSDID@32473 iut=\"3\" eventSource=\"Application\" eventID=\"1011\"]"; + + let mut sd_params = vec![]; + sd_params.push(("iut", "3".to_owned())); + sd_params.push(("eventSource", "Application".to_owned())); + sd_params.push(("eventID", "1011".to_owned())); + + let expected = StructuredDataElement { + id: "exampleSDID@32473", + params: sd_params, + }; + + let actual = StructuredDataElement::from_str(input) + .expect("could not parse input for structured data element"); + + assert_eq!(expected, actual); + } + + #[test] + fn parse_rfc3164_example_2() { + let input = b"<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"; + + let now = Utc.with_ymd_and_hms(2020, 10, 11, 0, 0, 0).unwrap(); + let msg = Message::from_rfc3164_bytes(input, &now); + + assert_eq!(msg.priority.facility, 4); + assert_eq!(msg.priority.severity, 2); + assert_eq!(msg.timestamp.unwrap().month(), 10); // Rest depends on local timezone ":-) + assert_eq!(msg.hostname, Some("mymachine")); + + // The 'tag' remains in the message; although we could extract 'su' as the tag, adherence to + // this format seems very patchy, and we're more likely to end up breaking messages that + // happen to include `:` by mistake. + assert_eq!( + msg.message, + Some(Borrowed("su: 'su root' failed for lonvick on /dev/pts/8")) + ); + } + + #[test] + fn parse_rfc3164_example_1() { + let input = b"Use the BFG!"; + + let msg = Message::from_rfc3164_bytes(input, &Utc::now()); + + assert_eq!("Use the BFG!", msg.message.unwrap()); + } +} diff --git a/squiflog/src/diagnostics.rs b/squiflog/src/diagnostics.rs index 07fbfbd..422a6c6 100644 --- a/squiflog/src/diagnostics.rs +++ b/squiflog/src/diagnostics.rs @@ -1,316 +1,326 @@ -use std::{ - collections::HashMap, - fmt::Display, - ops::Drop, - str::FromStr, - sync::{ - atomic::{AtomicUsize, Ordering}, - mpsc, Mutex, - }, - thread, - time::Duration, -}; - -use chrono::{DateTime, Utc}; - -use crate::error::{err_msg, Error}; - -pub(crate) static MIN_LEVEL: MinLevel = MinLevel(AtomicUsize::new(0)); - -lazy_static! { - static ref DIAGNOSTICS: Mutex> = Mutex::new(None); -} - -/** -Diagnostics configuration. -*/ -#[derive(Debug, Clone)] -pub struct Config { - /** - The interval to sample metrics at. - */ - pub metrics_interval_ms: u64, - /** - The minimum self log level to emit. - */ - pub min_level: Level, -} - -impl Default for Config { - fn default() -> Self { - Config { - metrics_interval_ms: 1 * 1000 * 60, // 1 minute - min_level: Level::Error, - } - } -} - -/** -Initialize parse-wide diagnostics. -*/ -pub fn init(config: Config) { - let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics"); - - if diagnostics.is_some() { - drop(diagnostics); - panic!("SYSLOG diagnostics have already been initialized"); - } - - MIN_LEVEL.set(config.min_level); - - // Only set up metrics if the minimum level is Debug - let metrics = if MIN_LEVEL.includes(Level::Debug) { - // NOTE: Diagnostics use a regular thread instead of `tokio` - // So that we can monitor metrics independently of the `tokio` - // runtime. - let (tx, rx) = mpsc::channel(); - let metrics_timeout = Duration::from_millis(config.metrics_interval_ms); - let handle = thread::spawn(move || loop { - match rx.recv_timeout(metrics_timeout) { - Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => { - emit_metrics(); - return; - } - Err(mpsc::RecvTimeoutError::Timeout) => { - emit_metrics(); - } - } - }); - - Some((tx, handle)) - } else { - None - }; - - *diagnostics = Some(Diagnostics { metrics }); -} - -/** -Stop parse-wide diagnostics. -*/ -pub fn stop() -> Result<(), Error> { - let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics"); - - if let Some(mut diagnostics) = diagnostics.take() { - diagnostics.stop_metrics()?; - } - - Ok(()) -} - -struct Diagnostics { - metrics: Option<(mpsc::Sender<()>, thread::JoinHandle<()>)>, -} - -impl Diagnostics { - fn stop_metrics(&mut self) -> Result<(), Error> { - if let Some((tx, handle)) = self.metrics.take() { - tx.send(())?; - - handle - .join() - .map_err(|_| err_msg("failed to join diagnostics handle"))?; - } - - Ok(()) - } -} - -impl Drop for Diagnostics { - fn drop(&mut self) { - if let Some((tx, _)) = self.metrics.take() { - let _ = tx.send(()); - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Level { - Debug, - Error, -} - -impl FromStr for Level { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s { - "DEBUG" => Ok(Level::Debug), - "ERROR" => Ok(Level::Error), - _ => Err(err_msg("expected `DEBUG` or `ERROR`")), - } - } -} - -impl Level { - fn to_usize(self) -> usize { - match self { - Level::Debug => 0, - Level::Error => 1, - } - } - - fn from_usize(v: usize) -> Self { - match v { - 0 => Level::Debug, - _ => Level::Error, - } - } -} - -#[derive(Serialize)] -struct DiagnosticEvent<'a> { - #[serde(rename = "@t")] - timestamp: DateTime, - - #[serde(rename = "@l")] - level: &'static str, - - #[serde(rename = "@mt")] - message_template: &'static str, - - #[serde(rename = "@x")] - #[serde(skip_serializing_if = "Option::is_none")] - error: Option<&'a str>, - - #[serde(flatten)] - additional: Option, -} - -impl<'a> DiagnosticEvent<'a> { - pub fn new( - level: &'static str, - error: Option<&'a str>, - message_template: &'static str, - additional: Option, - ) -> DiagnosticEvent<'a> { - DiagnosticEvent { - timestamp: Utc::now(), - message_template, - level, - error, - additional, - } - } -} - -pub fn emit(message_template: &'static str) { - if MIN_LEVEL.includes(Level::Debug) { - let evt = DiagnosticEvent::new("DEBUG", None, &message_template, None); - let json = serde_json::to_string(&evt).expect("infallible JSON"); - eprintln!("{}", json); - } -} - -pub fn emit_err(error: &impl Display, message_template: &'static str) { - if MIN_LEVEL.includes(Level::Error) { - let err_str = format!("{}", error); - let evt = DiagnosticEvent::new("ERROR", Some(&err_str), &message_template, None); - let json = serde_json::to_string(&evt).expect("infallible JSON"); - eprintln!("{}", json); - } -} - -fn emit_metrics() { - if MIN_LEVEL.includes(Level::Debug) { - #[derive(Serialize)] - struct EmitMetrics { - data: HashMap<&'static str, usize>, - server: HashMap<&'static str, usize>, - } - - let mut metrics = EmitMetrics { - data: HashMap::new(), - server: HashMap::new(), - }; - - let data = METRICS.data.take(); - let server = METRICS.server.take(); - - metrics.data.extend(data.as_ref().iter().cloned()); - metrics.server.extend(server.as_ref().iter().cloned()); - - let metrics = serde_json::to_value(metrics).expect("infallible JSON"); - - let evt = DiagnosticEvent::new( - "DEBUG", - None, - "Collected SYSLOG server metrics", - Some(metrics), - ); - let json = serde_json::to_string(&evt).expect("infallible JSON"); - - eprintln!("{}", json); - } -} - -pub(crate) struct MinLevel(AtomicUsize); - -impl MinLevel { - fn set(&self, min: Level) { - MIN_LEVEL.0.store(min.to_usize(), Ordering::Relaxed); - } - - fn get(&self) -> Level { - Level::from_usize(MIN_LEVEL.0.load(Ordering::Relaxed)) - } - - pub(crate) fn includes(&self, level: Level) -> bool { - level.to_usize() >= self.get().to_usize() - } -} - -pub(crate) struct Metrics { - pub(crate) data: crate::data::Metrics, - pub(crate) server: crate::server::Metrics, - _private: (), -} - -pub(crate) static METRICS: Metrics = Metrics { - data: crate::data::Metrics::new(), - server: crate::server::Metrics::new(), - _private: (), -}; - -macro_rules! increment { - ($($metric:tt)*) => {{ - if $crate::diagnostics::MIN_LEVEL.includes($crate::diagnostics::Level::Debug) { - $crate::diagnostics::METRICS.$($metric)*.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - }}; -} - -macro_rules! metrics { - ($($metric:ident),*) => { - #[allow(dead_code)] - pub(crate) struct Metrics { - $( - pub(crate) $metric: std::sync::atomic::AtomicUsize, - )* - _private: (), - } - - impl Metrics { - #[allow(dead_code)] - pub(crate) const fn new() -> Self { - Metrics { - $( - $metric: std::sync::atomic::AtomicUsize::new(0), - )* - _private: (), - } - } - - #[allow(dead_code)] - pub(crate) fn take(&self) -> impl AsRef<[(&'static str, usize)]> { - let fields = [ - $( - (stringify!($metric), self.$metric.swap(0, std::sync::atomic::Ordering::Relaxed)), - )* - ]; - - fields - } - } - }; -} +use std::{ + collections::HashMap, + fmt::Display, + ops::Drop, + str::FromStr, + sync::{ + atomic::{ + AtomicUsize, + Ordering, + }, + mpsc, + Mutex, + }, + thread, + time::Duration, +}; + +use chrono::{ + DateTime, + Utc, +}; + +use crate::error::{ + err_msg, + Error, +}; + +pub(crate) static MIN_LEVEL: MinLevel = MinLevel(AtomicUsize::new(0)); + +lazy_static! { + static ref DIAGNOSTICS: Mutex> = Mutex::new(None); +} + +/** +Diagnostics configuration. +*/ +#[derive(Debug, Clone)] +pub struct Config { + /** + The interval to sample metrics at. + */ + pub metrics_interval_ms: u64, + /** + The minimum self log level to emit. + */ + pub min_level: Level, +} + +impl Default for Config { + fn default() -> Self { + Config { + metrics_interval_ms: 1 * 1000 * 60, // 1 minute + min_level: Level::Error, + } + } +} + +/** +Initialize parse-wide diagnostics. +*/ +pub fn init(config: Config) { + let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics"); + + if diagnostics.is_some() { + drop(diagnostics); + panic!("SYSLOG diagnostics have already been initialized"); + } + + MIN_LEVEL.set(config.min_level); + + // Only set up metrics if the minimum level is Debug + let metrics = if MIN_LEVEL.includes(Level::Debug) { + // NOTE: Diagnostics use a regular thread instead of `tokio` + // So that we can monitor metrics independently of the `tokio` + // runtime. + let (tx, rx) = mpsc::channel(); + let metrics_timeout = Duration::from_millis(config.metrics_interval_ms); + let handle = thread::spawn(move || loop { + match rx.recv_timeout(metrics_timeout) { + Ok(()) | Err(mpsc::RecvTimeoutError::Disconnected) => { + emit_metrics(); + return; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + emit_metrics(); + } + } + }); + + Some((tx, handle)) + } else { + None + }; + + *diagnostics = Some(Diagnostics { metrics }); +} + +/** +Stop parse-wide diagnostics. +*/ +pub fn stop() -> Result<(), Error> { + let mut diagnostics = DIAGNOSTICS.lock().expect("failed to lock diagnostics"); + + if let Some(mut diagnostics) = diagnostics.take() { + diagnostics.stop_metrics()?; + } + + Ok(()) +} + +struct Diagnostics { + metrics: Option<(mpsc::Sender<()>, thread::JoinHandle<()>)>, +} + +impl Diagnostics { + fn stop_metrics(&mut self) -> Result<(), Error> { + if let Some((tx, handle)) = self.metrics.take() { + tx.send(())?; + + handle + .join() + .map_err(|_| err_msg("failed to join diagnostics handle"))?; + } + + Ok(()) + } +} + +impl Drop for Diagnostics { + fn drop(&mut self) { + if let Some((tx, _)) = self.metrics.take() { + let _ = tx.send(()); + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Level { + Debug, + Error, +} + +impl FromStr for Level { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "DEBUG" => Ok(Level::Debug), + "ERROR" => Ok(Level::Error), + _ => Err(err_msg("expected `DEBUG` or `ERROR`")), + } + } +} + +impl Level { + fn to_usize(self) -> usize { + match self { + Level::Debug => 0, + Level::Error => 1, + } + } + + fn from_usize(v: usize) -> Self { + match v { + 0 => Level::Debug, + _ => Level::Error, + } + } +} + +#[derive(Serialize)] +struct DiagnosticEvent<'a> { + #[serde(rename = "@t")] + timestamp: DateTime, + + #[serde(rename = "@l")] + level: &'static str, + + #[serde(rename = "@mt")] + message_template: &'static str, + + #[serde(rename = "@x")] + #[serde(skip_serializing_if = "Option::is_none")] + error: Option<&'a str>, + + #[serde(flatten)] + additional: Option, +} + +impl<'a> DiagnosticEvent<'a> { + pub fn new( + level: &'static str, + error: Option<&'a str>, + message_template: &'static str, + additional: Option, + ) -> DiagnosticEvent<'a> { + DiagnosticEvent { + timestamp: Utc::now(), + message_template, + level, + error, + additional, + } + } +} + +pub fn emit(message_template: &'static str) { + if MIN_LEVEL.includes(Level::Debug) { + let evt = DiagnosticEvent::new("DEBUG", None, &message_template, None); + let json = serde_json::to_string(&evt).expect("infallible JSON"); + eprintln!("{}", json); + } +} + +pub fn emit_err(error: &impl Display, message_template: &'static str) { + if MIN_LEVEL.includes(Level::Error) { + let err_str = format!("{}", error); + let evt = DiagnosticEvent::new("ERROR", Some(&err_str), &message_template, None); + let json = serde_json::to_string(&evt).expect("infallible JSON"); + eprintln!("{}", json); + } +} + +fn emit_metrics() { + if MIN_LEVEL.includes(Level::Debug) { + #[derive(Serialize)] + struct EmitMetrics { + data: HashMap<&'static str, usize>, + server: HashMap<&'static str, usize>, + } + + let mut metrics = EmitMetrics { + data: HashMap::new(), + server: HashMap::new(), + }; + + let data = METRICS.data.take(); + let server = METRICS.server.take(); + + metrics.data.extend(data.as_ref().iter().cloned()); + metrics.server.extend(server.as_ref().iter().cloned()); + + let metrics = serde_json::to_value(metrics).expect("infallible JSON"); + + let evt = DiagnosticEvent::new( + "DEBUG", + None, + "Collected SYSLOG server metrics", + Some(metrics), + ); + let json = serde_json::to_string(&evt).expect("infallible JSON"); + + eprintln!("{}", json); + } +} + +pub(crate) struct MinLevel(AtomicUsize); + +impl MinLevel { + fn set(&self, min: Level) { + MIN_LEVEL.0.store(min.to_usize(), Ordering::Relaxed); + } + + fn get(&self) -> Level { + Level::from_usize(MIN_LEVEL.0.load(Ordering::Relaxed)) + } + + pub(crate) fn includes(&self, level: Level) -> bool { + level.to_usize() >= self.get().to_usize() + } +} + +pub(crate) struct Metrics { + pub(crate) data: crate::data::Metrics, + pub(crate) server: crate::server::Metrics, + _private: (), +} + +pub(crate) static METRICS: Metrics = Metrics { + data: crate::data::Metrics::new(), + server: crate::server::Metrics::new(), + _private: (), +}; + +macro_rules! increment { + ($($metric:tt)*) => {{ + if $crate::diagnostics::MIN_LEVEL.includes($crate::diagnostics::Level::Debug) { + $crate::diagnostics::METRICS.$($metric)*.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + }}; +} + +macro_rules! metrics { + ($($metric:ident),*) => { + #[allow(dead_code)] + pub(crate) struct Metrics { + $( + pub(crate) $metric: std::sync::atomic::AtomicUsize, + )* + _private: (), + } + + impl Metrics { + #[allow(dead_code)] + pub(crate) const fn new() -> Self { + Metrics { + $( + $metric: std::sync::atomic::AtomicUsize::new(0), + )* + _private: (), + } + } + + #[allow(dead_code)] + pub(crate) fn take(&self) -> impl AsRef<[(&'static str, usize)]> { + let fields = [ + $( + (stringify!($metric), self.$metric.swap(0, std::sync::atomic::Ordering::Relaxed)), + )* + ]; + + fields + } + } + }; +} diff --git a/squiflog/src/error.rs b/squiflog/src/error.rs index 8fae343..5427234 100644 --- a/squiflog/src/error.rs +++ b/squiflog/src/error.rs @@ -1,62 +1,65 @@ -use std::{error, fmt}; - -pub struct Error(Inner); - -impl Error { - pub fn msg(msg: impl fmt::Display) -> Self { - err_msg(msg) - } -} - -struct Inner(String); - -impl fmt::Debug for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) - } -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) - } -} - -impl fmt::Debug for Inner { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) - } -} - -impl fmt::Display for Inner { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) - } -} - -impl error::Error for Inner {} - -impl From for Error -where - E: error::Error, -{ - fn from(err: E) -> Error { - Error(Inner(err.to_string())) - } -} - -impl From for Box { - fn from(err: Error) -> Box { - Box::new(err.0) - } -} - -impl From for Box { - fn from(err: Error) -> Box { - Box::new(err.0) - } -} - -pub(crate) fn err_msg(msg: impl fmt::Display) -> Error { - Error(Inner(msg.to_string())) -} +use std::{ + error, + fmt, +}; + +pub struct Error(Inner); + +impl Error { + pub fn msg(msg: impl fmt::Display) -> Self { + err_msg(msg) + } +} + +struct Inner(String); + +impl fmt::Debug for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl fmt::Display for Inner { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl error::Error for Inner {} + +impl From for Error +where + E: error::Error, +{ + fn from(err: E) -> Error { + Error(Inner(err.to_string())) + } +} + +impl From for Box { + fn from(err: Error) -> Box { + Box::new(err.0) + } +} + +impl From for Box { + fn from(err: Error) -> Box { + Box::new(err.0) + } +} + +pub(crate) fn err_msg(msg: impl fmt::Display) -> Error { + Error(Inner(msg.to_string())) +} diff --git a/squiflog/src/main.rs b/squiflog/src/main.rs index 24f3bd2..ef6b5c8 100644 --- a/squiflog/src/main.rs +++ b/squiflog/src/main.rs @@ -1,89 +1,101 @@ -extern crate squiflog; - -use bytes::Bytes; -use std::{any::Any, io::Read, panic::catch_unwind, thread}; - -use squiflog::{ - config::{self, Config}, - data, - diagnostics::{self, emit, emit_err}, - error::Error, - server, -}; - -fn run() -> Result<(), Box> { - let config = Config::from_env()?; - - // Initialize diagnostics - diagnostics::init(config.diagnostics); - - // The processor for converting SYSLOG into CLEF - let process = { - let data = data::build(config.data); - move |msg: Bytes| data.read_as_clef(&*msg) - }; - - // The server that drives the receiver and processor - let mut server = server::build(config.server, process)?; - - // If we should listen for stdin to terminate - if config::is_seq_app() { - let handle = server - .take_handle() - .ok_or_else(|| Error::msg("Failed to acquire handle to server"))?; - - listen_for_stdin_closed(handle); - } - - // Run the server and wait for it to exit - server.run()?; - diagnostics::stop()?; - - Ok(()) -} - -fn listen_for_stdin_closed(handle: server::Handle) { - // NOTE: This is a regular thread instead of `tokio` - // so that we don't block with our synchronous read that - // will probably never actually return - thread::spawn(move || 'wait: loop { - match std::io::stdin().read(&mut [u8::default()]) { - Ok(0) => { - let _ = handle.close(); - break 'wait; - } - Ok(_) => { - continue 'wait; - } - Err(_) => { - let _ = handle.close(); - break 'wait; - } - } - }); -} - -fn main() { - let run_server: Result<(), Box> = catch_unwind(|| run()) - .map_err(|panic| unwrap_panic(panic).into()) - .and_then(|inner| inner); - - if let Err(err) = run_server { - emit_err(&err, "SYSLOG input failed"); - std::process::exit(1); - } - - emit("SYSLOG input stopped"); -} - -fn unwrap_panic(panic: Box) -> Error { - if let Some(err) = panic.downcast_ref::<&str>() { - return Error::msg(err); - } - - if let Some(err) = panic.downcast_ref::() { - return Error::msg(err); - } - - Error::msg("unexpected panic (this is a bug)") -} +extern crate squiflog; + +use bytes::Bytes; +use std::{ + any::Any, + io::Read, + panic::catch_unwind, + thread, +}; + +use squiflog::{ + config::{ + self, + Config, + }, + data, + diagnostics::{ + self, + emit, + emit_err, + }, + error::Error, + server, +}; + +fn run() -> Result<(), Box> { + let config = Config::from_env()?; + + // Initialize diagnostics + diagnostics::init(config.diagnostics); + + // The processor for converting SYSLOG into CLEF + let process = { + let data = data::build(config.data); + move |msg: Bytes| data.read_as_clef(&*msg) + }; + + // The server that drives the receiver and processor + let mut server = server::build(config.server, process)?; + + // If we should listen for stdin to terminate + if config::is_seq_app() { + let handle = server + .take_handle() + .ok_or_else(|| Error::msg("Failed to acquire handle to server"))?; + + listen_for_stdin_closed(handle); + } + + // Run the server and wait for it to exit + server.run()?; + diagnostics::stop()?; + + Ok(()) +} + +fn listen_for_stdin_closed(handle: server::Handle) { + // NOTE: This is a regular thread instead of `tokio` + // so that we don't block with our synchronous read that + // will probably never actually return + thread::spawn(move || 'wait: loop { + match std::io::stdin().read(&mut [u8::default()]) { + Ok(0) => { + let _ = handle.close(); + break 'wait; + } + Ok(_) => { + continue 'wait; + } + Err(_) => { + let _ = handle.close(); + break 'wait; + } + } + }); +} + +fn main() { + let run_server: Result<(), Box> = catch_unwind(|| run()) + .map_err(|panic| unwrap_panic(panic).into()) + .and_then(|inner| inner); + + if let Err(err) = run_server { + emit_err(&err, "SYSLOG input failed"); + std::process::exit(1); + } + + emit("SYSLOG input stopped"); +} + +fn unwrap_panic(panic: Box) -> Error { + if let Some(err) = panic.downcast_ref::<&str>() { + return Error::msg(err); + } + + if let Some(err) = panic.downcast_ref::() { + return Error::msg(err); + } + + Error::msg("unexpected panic (this is a bug)") +} diff --git a/squiflog/src/server/mod.rs b/squiflog/src/server/mod.rs index cd17354..cb3bd8d 100644 --- a/squiflog/src/server/mod.rs +++ b/squiflog/src/server/mod.rs @@ -1,188 +1,200 @@ -use std::{marker::Unpin, str::FromStr}; - -use futures::{future::BoxFuture, select, FutureExt, StreamExt}; - -use tokio::{runtime::Runtime, signal::ctrl_c, sync::oneshot}; - -use bytes::Bytes; - -use crate::diagnostics::*; -use crate::error::Error; - -mod udp; - -metrics! { - receive_ok, - receive_err, - process_ok, - process_err -} - -/** -Server configuration. -*/ -#[derive(Debug, Clone)] -pub struct Config { - /** - The address to bind the server to. - */ - pub bind: Bind, -} - -#[derive(Debug, Clone)] -pub struct Bind { - pub addr: String, - pub protocol: Protocol, -} - -#[derive(Debug, Clone, Copy)] -pub enum Protocol { - Udp, -} - -impl FromStr for Bind { - type Err = Error; - - fn from_str(s: &str) -> Result { - match s.get(0..6) { - Some("udp://") => Ok(Bind { - addr: s[6..].to_owned(), - protocol: Protocol::Udp, - }), - _ => Ok(Bind { - addr: s.to_owned(), - protocol: Protocol::Udp, - }), - } - } -} - -impl Default for Config { - fn default() -> Self { - Config { - bind: Bind { - addr: "0.0.0.0:514".to_owned(), - protocol: Protocol::Udp, - }, - } - } -} - -/** -A SYSLOG server. -*/ -pub struct Server { - fut: BoxFuture<'static, ()>, - handle: Option, -} - -impl Server { - pub fn take_handle(&mut self) -> Option { - self.handle.take() - } - - pub fn run(self) -> Result<(), Error> { - // Run the server on a fresh runtime - // We attempt to shut this runtime down cleanly to release - // any used resources - let runtime = Runtime::new().expect("failed to start new Runtime"); - - runtime.block_on(self.fut); - - Ok(()) - } -} - -/** -A handle to a running SYSLOG server that can be used to interact with it -programmatically. -*/ -pub struct Handle { - close: oneshot::Sender<()>, -} - -impl Handle { - /** - Close the server. - */ - pub fn close(self) -> bool { - self.close.send(()).is_ok() - } -} - -pub fn build( - config: Config, - mut process: impl FnMut(Bytes) -> Result<(), Error> + Send + Sync + Unpin + Clone + 'static, -) -> Result { - emit("Starting SYSLOG server"); - - let addr = config.bind.addr.parse()?; - let (handle_tx, handle_rx) = oneshot::channel(); - - // Build a handle - let handle = Some(Handle { close: handle_tx }); - - let server = async move { - let incoming = udp::Server::bind(&addr).await?.build(); - - let mut close = handle_rx.fuse(); - let mut ctrl_c = ctrl_c().boxed().fuse(); - let mut incoming = incoming.fuse(); - - // NOTE: We don't use `?` here because we never want to carry results - // We always want to match them and deal with error cases directly - loop { - select! { - // A message that's ready to process - msg = incoming.next() => match msg { - // A complete message has been received - Some(Ok(msg)) => { - increment!(server.receive_ok); - - // Process the received message - match process(msg) { - Ok(()) => { - increment!(server.process_ok); - } - Err(err) => { - increment!(server.process_err); - emit_err(&err, "SYSLOG processing failed"); - } - } - }, - // An error occurred receiving a chunk - Some(Err(err)) => { - increment!(server.receive_err); - emit_err(&err, "SYSLOG processing failed"); - }, - None => { - unreachable!("receiver stream should never terminate") - }, - }, - // A termination signal from the programmatic handle - _ = close => { - emit("Handle closed; shutting down"); - break; - }, - // A termination signal from the environment - _ = ctrl_c => { - emit("Termination signal received; shutting down"); - break; - }, - }; - } - - emit("Stopping SYSLOG server"); - - Result::Ok::<(), Error>(()) - }; - - Ok(Server { - fut: Box::pin(async move { - if let Err(err) = server.await { - emit_err(&err, "SYSLOG server failed"); - } - }), - handle, - }) -} +use std::{ + marker::Unpin, + str::FromStr, +}; + +use futures::{ + future::BoxFuture, + select, + FutureExt, + StreamExt, +}; + +use tokio::{ + runtime::Runtime, + signal::ctrl_c, + sync::oneshot, +}; + +use bytes::Bytes; + +use crate::diagnostics::*; +use crate::error::Error; + +mod udp; + +metrics! { + receive_ok, + receive_err, + process_ok, + process_err +} + +/** +Server configuration. +*/ +#[derive(Debug, Clone)] +pub struct Config { + /** + The address to bind the server to. + */ + pub bind: Bind, +} + +#[derive(Debug, Clone)] +pub struct Bind { + pub addr: String, + pub protocol: Protocol, +} + +#[derive(Debug, Clone, Copy)] +pub enum Protocol { + Udp, +} + +impl FromStr for Bind { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s.get(0..6) { + Some("udp://") => Ok(Bind { + addr: s[6..].to_owned(), + protocol: Protocol::Udp, + }), + _ => Ok(Bind { + addr: s.to_owned(), + protocol: Protocol::Udp, + }), + } + } +} + +impl Default for Config { + fn default() -> Self { + Config { + bind: Bind { + addr: "0.0.0.0:514".to_owned(), + protocol: Protocol::Udp, + }, + } + } +} + +/** +A SYSLOG server. +*/ +pub struct Server { + fut: BoxFuture<'static, ()>, + handle: Option, +} + +impl Server { + pub fn take_handle(&mut self) -> Option { + self.handle.take() + } + + pub fn run(self) -> Result<(), Error> { + // Run the server on a fresh runtime + // We attempt to shut this runtime down cleanly to release + // any used resources + let runtime = Runtime::new().expect("failed to start new Runtime"); + + runtime.block_on(self.fut); + + Ok(()) + } +} + +/** +A handle to a running SYSLOG server that can be used to interact with it +programmatically. +*/ +pub struct Handle { + close: oneshot::Sender<()>, +} + +impl Handle { + /** + Close the server. + */ + pub fn close(self) -> bool { + self.close.send(()).is_ok() + } +} + +pub fn build( + config: Config, + mut process: impl FnMut(Bytes) -> Result<(), Error> + Send + Sync + Unpin + Clone + 'static, +) -> Result { + emit("Starting SYSLOG server"); + + let addr = config.bind.addr.parse()?; + let (handle_tx, handle_rx) = oneshot::channel(); + + // Build a handle + let handle = Some(Handle { close: handle_tx }); + + let server = async move { + let incoming = udp::Server::bind(&addr).await?.build(); + + let mut close = handle_rx.fuse(); + let mut ctrl_c = ctrl_c().boxed().fuse(); + let mut incoming = incoming.fuse(); + + // NOTE: We don't use `?` here because we never want to carry results + // We always want to match them and deal with error cases directly + loop { + select! { + // A message that's ready to process + msg = incoming.next() => match msg { + // A complete message has been received + Some(Ok(msg)) => { + increment!(server.receive_ok); + + // Process the received message + match process(msg) { + Ok(()) => { + increment!(server.process_ok); + } + Err(err) => { + increment!(server.process_err); + emit_err(&err, "SYSLOG processing failed"); + } + } + }, + // An error occurred receiving a chunk + Some(Err(err)) => { + increment!(server.receive_err); + emit_err(&err, "SYSLOG processing failed"); + }, + None => { + unreachable!("receiver stream should never terminate") + }, + }, + // A termination signal from the programmatic handle + _ = close => { + emit("Handle closed; shutting down"); + break; + }, + // A termination signal from the environment + _ = ctrl_c => { + emit("Termination signal received; shutting down"); + break; + }, + }; + } + + emit("Stopping SYSLOG server"); + + Result::Ok::<(), Error>(()) + }; + + Ok(Server { + fut: Box::pin(async move { + if let Err(err) = server.await { + emit_err(&err, "SYSLOG server failed"); + } + }), + handle, + }) +} diff --git a/squiflog/src/server/udp.rs b/squiflog/src/server/udp.rs index e1e2b84..bfda142 100644 --- a/squiflog/src/server/udp.rs +++ b/squiflog/src/server/udp.rs @@ -1,46 +1,58 @@ -use std::net::SocketAddr; - -use crate::{diagnostics::*, error::Error}; - -use bytes::{Bytes, BytesMut}; - -use futures::{Stream, StreamExt}; - -use tokio::net::UdpSocket; - -use tokio_util::{codec::Decoder, udp::UdpFramed}; - -pub(super) struct Server(UdpSocket); - -impl Server { - pub(super) async fn bind(addr: &SocketAddr) -> Result { - let sock = UdpSocket::bind(&addr).await?; - - Ok(Server(sock)) - } - - pub(super) fn build(self) -> impl Stream> { - emit("Setting up for UDP"); - - UdpFramed::new(self.0, Decode).map(|r| r.map(|(msg, _)| msg)) // ignore socket, just take message - } -} - -struct Decode; - -impl Decoder for Decode { - type Item = Bytes; - type Error = Error; - - fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { - // All datagrams are considered a valid message - // Split the Bytes mut into two components, and freeze the first one (initialised part, into a Bytes non-mut) - let src = src.split_to(src.len()).freeze(); - - if src.is_empty() { - return Ok(None); - } - - Ok(Some(src)) - } -} +use std::net::SocketAddr; + +use crate::{ + diagnostics::*, + error::Error, +}; + +use bytes::{ + Bytes, + BytesMut, +}; + +use futures::{ + Stream, + StreamExt, +}; + +use tokio::net::UdpSocket; + +use tokio_util::{ + codec::Decoder, + udp::UdpFramed, +}; + +pub(super) struct Server(UdpSocket); + +impl Server { + pub(super) async fn bind(addr: &SocketAddr) -> Result { + let sock = UdpSocket::bind(&addr).await?; + + Ok(Server(sock)) + } + + pub(super) fn build(self) -> impl Stream> { + emit("Setting up for UDP"); + + UdpFramed::new(self.0, Decode).map(|r| r.map(|(msg, _)| msg)) // ignore socket, just take message + } +} + +struct Decode; + +impl Decoder for Decode { + type Item = Bytes; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + // All datagrams are considered a valid message + // Split the Bytes mut into two components, and freeze the first one (initialised part, into a Bytes non-mut) + let src = src.split_to(src.len()).freeze(); + + if src.is_empty() { + return Ok(None); + } + + Ok(Some(src)) + } +} diff --git a/squiflog/src/test_util.rs b/squiflog/src/test_util.rs index 38cd76c..ad2344d 100644 --- a/squiflog/src/test_util.rs +++ b/squiflog/src/test_util.rs @@ -1,5 +1,12 @@ -use chrono::{DateTime, Utc}; +use chrono::{ + DateTime, + Utc, +}; pub fn to_timestamp(iso8601: &str) -> Option> { - Some(DateTime::parse_from_rfc3339(iso8601).expect("invalid test timestamp").with_timezone(&Utc)) + Some( + DateTime::parse_from_rfc3339(iso8601) + .expect("invalid test timestamp") + .with_timezone(&Utc), + ) }