diff --git a/Cargo.lock b/Cargo.lock index 612f200..e44710d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,24 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -94,7 +112,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -180,6 +198,21 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a17ed5635fc8536268e5d4de1e22e81ac34419e5f052d4d51f4e01dcc263fcc" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.0.89" @@ -237,7 +270,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -246,6 +279,33 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -253,13 +313,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" [[package]] -name = "colored" -version = "2.1.0" +name = "compact_str" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" dependencies = [ - "lazy_static", - "windows-sys 0.48.0", + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", ] [[package]] @@ -302,6 +365,32 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crossterm" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" +dependencies = [ + "bitflags 2.4.2", + "crossterm_winapi", + "futures-core", + "libc", + "mio", + "parking_lot", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -333,15 +422,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "deranged" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" -dependencies = [ - "powerfmt", -] - [[package]] name = "digest" version = "0.10.7" @@ -353,6 +433,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "either" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11157ac094ffbdde99aa67b23417ebdd801842852b500e395a45a9c0aac03e4a" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -378,6 +464,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -470,7 +566,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -554,6 +650,10 @@ name = "hashbrown" version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "heck" @@ -680,6 +780,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "2.2.5" @@ -690,6 +796,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "indoc" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e186cfbae8084e513daff4240b4797e342f988cecda4fb6c939150f96315fd8" + [[package]] name = "instant" version = "0.1.12" @@ -705,6 +817,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -754,6 +875,15 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +[[package]] +name = "lru" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +dependencies = [ + "hashbrown", +] + [[package]] name = "memchr" version = "2.7.1" @@ -782,6 +912,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -804,12 +935,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "num-conv" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" - [[package]] name = "num-traits" version = "0.2.18" @@ -829,15 +954,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" -dependencies = [ - "libc", -] - [[package]] name = "object" version = "0.32.2" @@ -856,21 +972,26 @@ dependencies = [ "base64 0.22.0", "chrono", "clap", + "color-eyre", + "crossterm", "csv", "futures", "hmac", "log", "poston", + "ratatui", "reqwest", "serde", "serde_derive", "serde_json", "serde_yaml", "sha2", + "signal-hook", "simple-logging", - "simple_logger", "tokio", "tokio-stream", + "tokio-util", + "tui-scrollview", ] [[package]] @@ -902,7 +1023,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -923,6 +1044,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parking_lot" version = "0.12.1" @@ -993,12 +1120,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1053,6 +1174,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ratatui" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcb12f8fbf6c62614b0d56eb352af54f6a22410c3b079eb53ee93c7b97dd31d8" +dependencies = [ + "bitflags 2.4.2", + "cassowary", + "compact_str", + "crossterm", + "indoc", + "itertools", + "lru", + "paste", + "stability", + "strum", + "unicode-segmentation", + "unicode-width", +] + [[package]] name = "redox_syscall" version = "0.1.57" @@ -1158,6 +1299,12 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustversion" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc183a10b4478d04cbbbfc96d0873219d962dd5accaff2ffbd4ceb7df837f4" + [[package]] name = "ryu" version = "1.0.17" @@ -1219,7 +1366,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -1269,6 +1416,36 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1289,18 +1466,6 @@ dependencies = [ "thread-id", ] -[[package]] -name = "simple_logger" -version = "4.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e7e46c8c90251d47d08b28b8a419ffb4aede0f87c2eea95e17d1d5bacbf3ef1" -dependencies = [ - "colored", - "log", - "time", - "windows-sys 0.48.0", -] - [[package]] name = "slab" version = "0.4.9" @@ -1326,18 +1491,67 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "stability" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd1b177894da2a2d9120208c3386066af06a488255caabc5de8ddca22dbc3ce" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" +[[package]] +name = "strum" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.52", +] + [[package]] name = "subtle" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.52" @@ -1400,36 +1614,13 @@ dependencies = [ ] [[package]] -name = "time" -version = "0.3.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" -dependencies = [ - "deranged", - "itoa", - "libc", - "num-conv", - "num_threads", - "powerfmt", - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" - -[[package]] -name = "time-macros" -version = "0.2.17" +name = "thread_local" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ - "num-conv", - "time-core", + "cfg-if", + "once_cell", ] [[package]] @@ -1474,7 +1665,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", ] [[package]] @@ -1535,6 +1726,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "sharded-slab", + "thread_local", + "tracing-core", ] [[package]] @@ -1543,6 +1756,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tui-scrollview" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab6018c2fed0387ef031350342548eeca6a41ab5bcc511b2f3f2b11381f455d8" +dependencies = [ + "indoc", + "ratatui", +] + [[package]] name = "typenum" version = "1.17.0" @@ -1570,6 +1793,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + +[[package]] +name = "unicode-width" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + [[package]] name = "unsafe-libyaml" version = "0.2.10" @@ -1602,6 +1837,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1650,7 +1891,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.52", "wasm-bindgen-shared", ] @@ -1684,7 +1925,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.52", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1877,3 +2118,23 @@ dependencies = [ "cfg-if", "windows-sys 0.48.0", ] + +[[package]] +name = "zerocopy" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] diff --git a/Cargo.toml b/Cargo.toml index 65f04f3..2d64eeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,22 +6,27 @@ edition = "2021" [dependencies] anyhow = "1.0.81" -log = "0.4.16" -simple_logger = "4.3.3" +tui-scrollview = "0.3.2" +ratatui = { version = "0.26.1", features = [] } +crossterm = { version = "0.27.0", features = ["event-stream"] } +color-eyre = "0.6.3" chrono = "0.4.19" futures = "0.3.21" reqwest = {version = "0.11.10", features = ["blocking", "json"]} -tokio = {version="1.17.0", features=["full"]} +tokio = { version = "1.17.0", features = ["full"] } tokio-stream = "0.1.8" -serde="1.0.136" +serde = "1.0.136" serde_yaml = "0.9.32" serde_json="1.0.79" serde_derive = "1.0.136" clap = { version = "4.5.2", features = ["derive"] } csv = "1.3.0" +log = { version = "0.4.21", features = ["std"] } poston = "0.7.8" base64 = "0.22.0" hmac = "0.12.1" sha2 = "0.10.8" async-trait = "0.1.77" simple-logging = "2.0.2" +tokio-util = "0.7.10" +signal-hook = "0.3.17" diff --git a/README.md b/README.md index 5ca63c2..6fd0b90 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,19 @@ rewrite, I'm hoping I'll be able to maintain the smaller codebase in my limited - Csv file - Graylog - Fluentd +- Azure Log Analytics If you were using an interface that was dropped, keep using the previous version and raise an issue asking for the interface to be included. I don't mind writing an interface for one person, I only mind writing it for no one. + +#### Interactive interface + +An interactive terminal interface was added, which allows testing the API connection, retrieving logs, and load testing +by downloading each log an arbitrary number of times. This should allow live troubleshooting and testing, which might +make solving issues easier. You can use it by running the collector as normal, only adding the '--interactive' command +line parameter. + #### Add container releases While binaries will still be available, the primary method of release should be containers. This will hopefully @@ -32,6 +41,8 @@ be necessary. # Office365 audit log collector +![Screenshot.jpg](Screenshot.jpg) + Collect/retrieve Office365, Azure and DLP audit logs, optionally filter them, then send them to one or more outputs (see full list below). Onboarding is easy and takes only a few minutes (see 'Onboarding' section). There are Windows and Linux executables. diff --git a/Release/ConfigExamples/CsvOutput.yaml b/Release/ConfigExamples/CsvOutput.yaml index bd22c9b..b6c43eb 100644 --- a/Release/ConfigExamples/CsvOutput.yaml +++ b/Release/ConfigExamples/CsvOutput.yaml @@ -1,5 +1,4 @@ collect: - skipKnownLogs: True workingDir: /app contentTypes: Audit.General: True diff --git a/Release/ConfigExamples/fullConfig.yaml b/Release/ConfigExamples/fullConfig.yaml index 071ae44..3fa054d 100644 --- a/Release/ConfigExamples/fullConfig.yaml +++ b/Release/ConfigExamples/fullConfig.yaml @@ -15,6 +15,7 @@ collect: # Settings determining which audit logs to collect and how to do it retries: 3 # Times to retry retrieving a content blob if it fails skipKnownLogs: True # Remember retrieved log blobs, don't collect them twice hoursToCollect: 24 # Look back this many hours for audit logs (max supported by Office API is 168) + duplicate: 1 # Amount of times to download each log, can be used for performance testing by inflating the number of logs to download. Default is 1 filter: # Only logs that match ALL filters for a content type are collected. Leave empty to collect all Audit.General: Audit.AzureActiveDirectory: diff --git a/Release/Linux/OfficeAuditLogCollector b/Release/Linux/OfficeAuditLogCollector index 0f27a41..df2685f 100644 --- a/Release/Linux/OfficeAuditLogCollector +++ b/Release/Linux/OfficeAuditLogCollector @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6cf5a102e87b8841728aa9b39152934b116c1591fc25a2de82e0343313573ad5 -size 6292760 +oid sha256:9b4d3320c7e109dbe6ed1f380ae16230f6ca8b9b005d815bdb44aeffc0f4e454 +size 7349496 diff --git a/Release/Windows/OfficeAuditLogCollector.exe b/Release/Windows/OfficeAuditLogCollector.exe index 0dcbfdf..a6f8c96 100644 --- a/Release/Windows/OfficeAuditLogCollector.exe +++ b/Release/Windows/OfficeAuditLogCollector.exe @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:78791d7113b976128bcf90294aa74b5a09cc3a8eebdcc79640468e979012fd7b -size 4945920 +oid sha256:69580b148a6b7ee23881a8562998028bdb113c6ec85c19f2cd9911ba7a3da4fc +size 6075904 diff --git a/Screenshot.jpg b/Screenshot.jpg new file mode 100644 index 0000000..89ddf0e Binary files /dev/null and b/Screenshot.jpg differ diff --git a/src/api_connection.rs b/src/api_connection.rs index 9979b85..bc248b9 100644 --- a/src/api_connection.rs +++ b/src/api_connection.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::time::Duration; use reqwest; use log::{debug, warn, error, info}; use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap}; @@ -9,25 +10,26 @@ use futures::channel::mpsc::{Receiver, Sender}; use crate::config::Config; use crate::data_structures::{JsonList, StatusMessage, GetBlobConfig, GetContentConfig, AuthResult, ContentToRetrieve, CliArgs}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use serde_json::Value; /// Return a logged in API connection object. Use the Headers value to make API requests. -pub async fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection { +pub async fn get_api_connection(args: CliArgs, config: Config) -> Result { let mut api = ApiConnection { args, config, headers: HeaderMap::new(), }; - api.login().await; - api + api.login().await?; + Ok(api) } /// Abstraction of an API connection to Azure Management APIs. Can be used to login to the API /// which sets the headers. These headers can then be used to make authenticated requests. +#[derive(Clone)] pub struct ApiConnection { pub args: CliArgs, pub config: Config, @@ -36,7 +38,7 @@ pub struct ApiConnection { impl ApiConnection { /// Use tenant_id, client_id and secret_key to request a bearer token and store it in /// our headers. Must be called once before requesting any content. - async fn login(&mut self) { + pub async fn login(&mut self) -> Result<()> { info!("Logging in to Office Management API."); let auth_url = format!("https://login.microsoftonline.com/{}/oauth2/token", self.args.tenant_id.to_string()); @@ -52,51 +54,72 @@ impl ApiConnection { self.headers.insert(CONTENT_TYPE, "application/x-www-form-urlencoded".parse().unwrap()); let login_client = reqwest::Client::new(); - let result = login_client + let response = login_client .post(auth_url) .headers(self.headers.clone()) .form(¶ms) .send() - .await; - let response = match result { - Ok(response) => response, - Err(e) => { - let msg = format!("Could not send API login request: {}", e); - error!("{}", msg); - panic!("{}", msg); - } - }; + .await?; if !response.status().is_success() { - let text = match response.text().await { - Ok(text) => text, - Err(e) => { - let msg = format!("Received error response to API login, but could not parse response: {}", e); - error!("{}", msg); - panic!("{}", msg); - } - }; + let text = response.text().await?; let msg = format!("Received error response to API login: {}", text); error!("{}", msg); - panic!("{}", msg); + return Err(anyhow!("{}", msg)); } - let json = match response.json::().await { - Ok(json) => json, - Err(e) => { - let msg = format!("Could not parse API login reply: {}", e); - error!("{}", msg); - panic!("{}", msg); - } - }; - + let json = response.json::().await?; let token = format!("bearer {}", json.access_token); self.headers.insert(AUTHORIZATION, token.parse().unwrap()); - info!("Successfully logged in to Office Management API.") + info!("Successfully logged in to Office Management API."); + Ok(()) } fn get_base_url(&self) -> String { format!("https://manage.office.com/api/v1.0/{}/activity/feed", self.args.tenant_id) } + pub async fn get_feeds(&self) -> Result> { + + let url = format!("{}/subscriptions/list", self.get_base_url()); + let client = reqwest::Client::new(); + let result: Vec> = client + .get(url) + .headers(self.headers.clone()) + .header("content-length", 0) + .send() + .await? + .json() + .await?; + Ok(result.iter() + .filter(|x| x.get("status").unwrap() == "enabled") + .map(|x|x.get("contentType").unwrap().as_str().unwrap().to_string()) + .collect()) + } + + pub async fn set_subscription(&self, content_type: String, enable: bool) -> Result<()> { + + let action = if enable { "start" } else { "stop" }; + let url = format!("{}/subscriptions/{}?contentType={}", + self.get_base_url(), + action, + content_type + ); + debug!("Subscribing to {} feed.", content_type); + let client = reqwest::Client::new(); + let response = client + .post(url) + .headers(self.headers.clone()) + .header("content-length", 0) + .send() + .await?; + if !response.status().is_success() { + let text = response.text().await?; + let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text); + error!("{}", msg); + return Err(anyhow!("{}", msg)) + } + Ok(()) + } + pub async fn subscribe_to_feeds(&self) -> Result<()> { info!("Subscribing to audit feeds."); @@ -138,23 +161,7 @@ impl ApiConnection { } } for content_type in content_types { - let url = format!("{}/subscriptions/start?contentType={}", - self.get_base_url(), - content_type - ); - debug!("Subscribing to {} feed.", content_type); - let response = client - .post(url) - .headers(self.headers.clone()) - .header("content-length", 0) - .send() - .await?; - if !response.status().is_success() { - let text = response.text().await?; - let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text); - error!("{}", msg); - panic!("{}", msg); - } + self.set_subscription(content_type, true).await?; } info!("All audit feeds subscriptions exist."); Ok(()) @@ -196,7 +203,7 @@ impl ApiConnection { /// next page of content. This is sent over the blobs_tx channel to retrieve as well. If no /// additional pages exist, a status message is sent to indicate all content blobs for this /// content type have been retrieved. -#[tokio::main(flavor="multi_thread", worker_threads=200)] +#[tokio::main(flavor="multi_thread", worker_threads=20)] pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String, String)>, known_blobs: HashMap) { @@ -204,19 +211,33 @@ pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String let blobs_tx = config.blobs_tx.clone(); let blob_error_tx = config.blob_error_tx.clone(); - let status_tx = config.status_tx.clone(); + let mut status_tx = config.status_tx.clone(); let content_tx = config.content_tx.clone(); let client = config.client.clone(); let headers = config.headers.clone(); let content_type = content_type.clone(); let url = url.clone(); let known_blobs = known_blobs.clone(); + let duplicate = config.duplicate; async move { - match client.get(url.clone()).timeout(std::time::Duration::from_secs(5)). - headers(headers.clone()).send().await { + match client + .get(url.clone()) + .timeout(Duration::from_secs(5)) + .headers(headers.clone()).send().await { Ok(resp) => { - handle_blob_response(resp, blobs_tx, status_tx, content_tx, blob_error_tx, - content_type, url, &known_blobs).await; + if resp.status().is_success() { + handle_blob_response(resp, blobs_tx, status_tx, content_tx, blob_error_tx, + content_type, url, &known_blobs, duplicate).await; + } else { + if let Ok(text) = resp.text().await { + if text.to_lowercase().contains("too many request") { + status_tx.send(StatusMessage::BeingThrottled).await.unwrap(); + } else { + error!("Err getting blob response {}", text); + } + handle_blob_response_error(status_tx, blob_error_tx, content_type, url).await; + } + } }, Err(e) => { error!("Err getting blob response {}", e); @@ -236,17 +257,18 @@ async fn handle_blob_response( resp: reqwest::Response, blobs_tx: Sender<(String, String)>, mut status_tx: Sender, content_tx: Sender, mut blob_error_tx: Sender<(String, String)>, content_type: String, url: String, - known_blobs: &HashMap) { + known_blobs: &HashMap, duplicate: usize) { handle_blob_response_paging(&resp, blobs_tx, status_tx.clone(), content_type.clone()).await; - match resp.json::>>().await { + match resp.json::>>().await { Ok(i) => { - handle_blob_response_content_uris(status_tx, content_tx, content_type, i, known_blobs) + handle_blob_response_content_uris(status_tx, content_tx, content_type, i, known_blobs, + duplicate) .await; }, Err(e) => { - warn!("Err getting blob JSON {}", e); + warn!("Error getting blob JSON {}", e); match blob_error_tx.send((content_type, url)).await { Err(e) => { error!("Could not resend failed blob, dropping it: {}", e); @@ -288,7 +310,8 @@ async fn handle_blob_response_paging( /// over the content_tx channel for the content thread to retrieve. async fn handle_blob_response_content_uris( mut status_tx: Sender, mut content_tx: Sender, - content_type: String, content_json: JsonList, known_blobs: &HashMap) { + content_type: String, content_json: JsonList, known_blobs: &HashMap, + duplicate: usize) { for json_dict in content_json.into_iter() { if json_dict.contains_key("contentUri") == false { @@ -313,12 +336,19 @@ async fn handle_blob_response_content_uris( let content_to_retrieve = ContentToRetrieve { expiration, content_type: content_type.clone(), content_id, url}; - content_tx.send(content_to_retrieve).await.unwrap_or_else( - |e| panic!("Could not send found content, channel closed?: {}", e) - ); - status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else( - |e| panic!("Could not send status update, channel closed?: {}", e) - ); + if duplicate <= 1 { + content_tx.send(content_to_retrieve).await.unwrap_or_else( + |e| panic!("Could not send found content, channel closed?: {}", e)); + status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else( + |e| panic!("Could not send status update, channel closed?: {}", e)); + } else { + for _ in 0..duplicate { + content_tx.send(content_to_retrieve.clone()).await.unwrap_or_else( + |e| panic!("Could not send found content, channel closed?: {}", e)); + status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else( + |e| panic!("Could not send status update, channel closed?: {}", e)); + } + } } }; } @@ -341,8 +371,9 @@ async fn handle_blob_response_error( /// Retrieve the actual ContentUris found in the JSON body of content blobs. -#[tokio::main(flavor="multi_thread", worker_threads=200)] +#[tokio::main(flavor="multi_thread", worker_threads=50)] pub async fn get_content(config: GetContentConfig, content_rx: Receiver) { + content_rx.for_each_concurrent(config.threads, |content_to_retrieve| { let client = config.client.clone(); let headers = config.headers.clone(); @@ -351,7 +382,10 @@ pub async fn get_content(config: GetContentConfig, content_rx: Receiver { handle_content_response(resp, result_tx, status_tx, content_error_tx, content_to_retrieve).await; @@ -363,7 +397,7 @@ pub async fn get_content(config: GetContentConfig, content_rx: Receiver, mut content_error_tx: Sender, content_to_retrieve: ContentToRetrieve) { + if !resp.status().is_success() { + match content_error_tx.send(content_to_retrieve).await { + Err(_) => { + status_tx.send(StatusMessage::ErrorContentBlob).await.unwrap_or_else( + |e| panic!("Could not send status update, channel closed?: {}", e) + ); + }, + _=> (), + } + if let Ok(text) = resp.text().await { + if text.to_lowercase().contains("too many request") { + match status_tx.send(StatusMessage::BeingThrottled).await { + Err(e) => { + error!("Could not send status message: {}", e); + }, + _=> (), + } + } + } + return + } + match resp.text().await { Ok(json) => { result_tx.send((json, content_to_retrieve)).await.unwrap_or_else( @@ -383,8 +439,7 @@ async fn handle_content_response( Err(e) => { warn!("Error interpreting JSON: {}", e); match content_error_tx.send(content_to_retrieve).await { - Err(e) => { - error!("Could not resend failed content, dropping it: {}", e); + Err(_) => { status_tx.send(StatusMessage::ErrorContentBlob).await.unwrap_or_else( |e| panic!("Could not send status update, channel closed?: {}", e) ); diff --git a/src/collector.rs b/src/collector.rs index f836a0d..4774fe8 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -2,22 +2,28 @@ use std::thread; use std::collections::HashMap; use std::mem::swap; use std::ops::Div; -use std::time::Instant; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use anyhow::Result; use log::{warn, error, info}; use futures::{SinkExt}; use futures::channel::mpsc::channel; use futures::channel::mpsc::{Sender, Receiver}; use serde_json::Value; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; +use tokio::time::sleep; use crate::data_structures; use crate::api_connection; use crate::api_connection::ApiConnection; use crate::config::{Config, ContentTypesSubConfig}; -use crate::data_structures::{ArbitraryJson, Caches, CliArgs, ContentToRetrieve, JsonList}; +use crate::data_structures::{ArbitraryJson, Caches, CliArgs, ContentToRetrieve, JsonList, RunState}; use crate::interfaces::azure_oms_interface::OmsInterface; use crate::interfaces::interface::Interface; use crate::interfaces::file_interface::FileInterface; use crate::interfaces::fluentd_interface::FluentdInterface; use crate::interfaces::graylog_interface::GraylogInterface; +use crate::interfaces::interactive_interface::InteractiveInterface; /// # Office Audit Log Collector @@ -30,8 +36,7 @@ use crate::interfaces::graylog_interface::GraylogInterface; /// interfaces. Active interfaces are determined by the config file passed in by the user. pub struct Collector { config: Config, - interfaces: Vec>, - + interfaces: Vec>, result_rx: Receiver<(String, ContentToRetrieve)>, stats_rx: Receiver<(usize, usize, usize, usize)>, kill_tx: tokio::sync::mpsc::Sender, @@ -43,10 +48,19 @@ pub struct Collector { impl Collector { - pub async fn new(args: CliArgs, config: Config, runs: HashMap>) -> Collector { + pub async fn new(args: CliArgs, + config: Config, + runs: HashMap>, + state: Arc>, + interactive_sender: Option>> + ) -> Result { + info!("Initializing collector."); // Initialize interfaces - let mut interfaces: Vec> = Vec::new(); + let mut interfaces: Vec> = Vec::new(); + if args.interactive { + interfaces.push(Box::new(InteractiveInterface::new(interactive_sender.unwrap()))); + } if config.output.file.is_some() { interfaces.push(Box::new(FileInterface::new(config.clone()))); } @@ -61,14 +75,8 @@ impl Collector { } // Initialize collector threads - let api = api_connection::get_api_connection( - args.clone(), config.clone() - ).await; - if let Err(e) = api.subscribe_to_feeds().await { - let msg = format!("Error subscribing to audit feeds: {}", e); - error!("{}", msg); - panic!("{}", msg); - } + let api = api_connection::get_api_connection(args.clone(), config.clone()).await?; + api.subscribe_to_feeds().await?; let known_blobs = config.load_known_blobs(); let (result_rx, stats_rx, kill_tx) = @@ -76,7 +84,8 @@ impl Collector { config.collect.content_types, runs.clone(), &config, - known_blobs.clone()); + known_blobs.clone(), + state); // Initialize collector let cache_size = config.collect.cache_size.unwrap_or(500000); @@ -87,7 +96,7 @@ impl Collector { } else { HashMap::new() }; - Collector { + let collector = Collector { config, interfaces, result_rx, @@ -97,7 +106,8 @@ impl Collector { kill_tx, filters, cache - } + }; + Ok(collector) } /// Monitor all started content retrieval threads, processing results and terminating @@ -121,6 +131,7 @@ impl Collector { // Check if a log came in. self.check_results().await; } + self.check_all_results().await; self.end_run(); } @@ -128,21 +139,35 @@ impl Collector { self.config.save_known_blobs(&self.known_blobs); } - async fn check_results(&mut self) { + pub async fn check_results(&mut self) -> usize { if let Ok(Some((msg, content))) = self.result_rx.try_next() { - self.handle_content(msg, content).await; + self.handle_content(msg, content).await + } else { + 0 + } + } + + pub async fn check_all_results(&mut self) -> usize { + + let mut amount = 0; + while let Ok(Some((msg, content))) = self.result_rx.try_next() { + amount += self.handle_content(msg, content).await; } + amount } - async fn handle_content(&mut self, msg: String, content: ContentToRetrieve) { + async fn handle_content(&mut self, msg: String, content: ContentToRetrieve) -> usize { self.known_blobs.insert(content.content_id.clone(), content.expiration.clone()); if let Ok(logs) = serde_json::from_str::(&msg) { + let amount = logs.len(); for log in logs { self.handle_log(log, &content).await; } + amount } else { - warn!("Skipped log that could not be parsed: {}", content.content_id) + warn!("Skipped log that could not be parsed: {}", content.content_id); + 0 } } @@ -165,7 +190,7 @@ impl Collector { self.output().await; } } - async fn check_stats(&mut self) -> bool { + pub async fn check_stats(&mut self) -> bool { if let Ok(Some((found, successful, @@ -181,7 +206,6 @@ impl Collector { self.saved, ); info!("{}", output); - println!("{}", output); true } else { false @@ -204,11 +228,12 @@ impl Collector { fn get_output_string(&self, found: usize, successful: usize, failed: usize, retried: usize, saved: usize) -> String { format!("\ - Blobs found: {}\n\ - Blobs successful: {}\n\ - Blobs failed: {}\n\ - Blobs retried: {}\n\ - Logs saved: {}\n", +Done!|| +Blobs found: {}|| +Blobs successful: {}|| +Blobs failed: {}|| +Blobs retried: {}|| +Logs saved: {}", found, successful, failed, retried, saved ) } @@ -271,7 +296,8 @@ fn initialize_channels( headers: api.headers.clone(), status_tx: status_tx.clone(), blobs_tx: blobs_tx.clone(), blob_error_tx: blob_error_tx.clone(), content_tx: content_tx.clone(), - threads: config.collect.max_threads.unwrap_or(50) + threads: config.collect.max_threads.unwrap_or(50), + duplicate: config.collect.duplicate.unwrap_or(1), }; let content_config = data_structures::GetContentConfig { @@ -306,7 +332,8 @@ fn get_available_content(api: ApiConnection, content_types: ContentTypesSubConfig, runs: HashMap>, config: &Config, - known_blobs: HashMap) + known_blobs: HashMap, + state: Arc>) -> (Receiver<(String, ContentToRetrieve)>, Receiver<(usize, usize, usize, usize)>, tokio::sync::mpsc::Sender) { @@ -325,7 +352,8 @@ fn get_available_content(api: ApiConnection, message_loop_config, blobs_rx, content_rx, - known_blobs); + known_blobs, + state); (result_rx, stats_rx, kill_tx) } @@ -339,11 +367,13 @@ fn spawn_blob_collector( message_loop_config: data_structures::MessageLoopConfig, blobs_rx: Receiver<(String, String)>, content_rx: Receiver, - known_blobs: HashMap) { + known_blobs: HashMap, + state: Arc>) { + info!("Spawned collector threads"); thread::spawn( move || {api_connection::get_content_blobs(blob_config, blobs_rx, known_blobs);}); thread::spawn( move || {api_connection::get_content(content_config, content_rx);}); - thread::spawn(move || {message_loop(message_loop_config)}); + thread::spawn(move || {message_loop(message_loop_config, state)}); } @@ -352,23 +382,29 @@ fn spawn_blob_collector( /// awaiting_content_blobs is incremented; every time content is retrieved or could not be /// retrieved awaiting_content_blobs is decremented. When it reaches 0 we know we are done. #[tokio::main] -pub async fn message_loop(mut config: data_structures::MessageLoopConfig) { +pub async fn message_loop(mut config: data_structures::MessageLoopConfig, + mut state: Arc>) { // Send base URLS for content blob retrieval then keep track of when they've all come in - let mut awaiting_content_types: usize = 0; for (content_type, base_url) in config.urls.into_iter() { config.blobs_tx.clone().send((content_type, base_url)).await.unwrap(); - awaiting_content_types += 1; + state.lock().await.awaiting_content_types += 1; } - // Keep track of found and retrieved content blobs - let mut awaiting_content_blobs: usize = 0; - // Keep track of retry count for failed blobs - let mut retry_map: HashMap = HashMap::new(); - // Keep stats to return to python after run finishes - let mut stats = data_structures::RunStatistics::new(); + + let mut rate_limit_backoff_started: Option = None; + let mut retry_map = HashMap::new(); // Loop ends with the run itself, signalling the program is done. loop { + if let Some(t) = rate_limit_backoff_started { + if t.elapsed().as_secs() >= 30 { + rate_limit_backoff_started = None; + state.lock().await.rate_limited = false; + info!("Release rate limit"); + } + } + + if let Ok(msg) = config.kill_rx.try_recv() { if msg { info!("Stopping collector."); @@ -379,30 +415,31 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) { // been found, and all found blobs have been retrieved, we are done. if let Ok(Some(msg)) = config.status_rx.try_next() { match msg { + // We have found a new content blob while iterating through the pages of them. + // It has been queued up to be retrieved. + data_structures::StatusMessage::FoundNewContentBlob => { + state.lock().await.awaiting_content_blobs +=1; + state.lock().await.stats.blobs_found += 1; + }, // awaiting_content_types is initially the size of content type * runs for each // content type. When retrieving pages if we don't get a NextPageUri response // header, we know we have found all possible blobs for that content type and // we decrement awaiting_content_types. When it hits 0 we know we found all // content that can possible be retrieved. data_structures::StatusMessage::FinishedContentBlobs => { - awaiting_content_types = awaiting_content_types.saturating_sub(1); - if awaiting_content_types == 0 && awaiting_content_blobs == 0 { + let new_content_types = state.lock().await.awaiting_content_types.saturating_sub(1); + state.lock().await.awaiting_content_types = new_content_types; + if check_done(&mut state).await { break } }, - // We have found a new content blob while iterating through the pages of them. - // It has been queued up to be retrieved. - data_structures::StatusMessage::FoundNewContentBlob => { - awaiting_content_blobs +=1; - stats.blobs_found += 1; - }, // A queued up content blob has actually been retrieved so we are done with it. // When awaiting_content_blobs hits 0 we are done retrieving all actual content // and we can exit. data_structures::StatusMessage::RetrievedContentBlob => { - awaiting_content_blobs -= 1; - stats.blobs_successful += 1; - if awaiting_content_types == 0 && awaiting_content_blobs == 0 { + state.lock().await.awaiting_content_blobs -= 1; + state.lock().await.stats.blobs_successful += 1; + if check_done(&mut state).await { config.content_tx.close_channel(); break; } @@ -411,14 +448,20 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) { // When awaiting_content_blobs hits 0 we are done retrieving all actual content // and we can exit. data_structures::StatusMessage::ErrorContentBlob => { - awaiting_content_blobs -= 1; - stats.blobs_error += 1; - if awaiting_content_types == 0 && awaiting_content_blobs == 0 { + state.lock().await.awaiting_content_blobs -= 1; + state.lock().await.stats.blobs_error += 1; + if check_done(&mut state).await { config.content_tx.close_channel(); break; } } - data_structures::StatusMessage::BeingThrottled => warn!("Throttled!"), // TODO: handle being throttled + data_structures::StatusMessage::BeingThrottled => { + if rate_limit_backoff_started.is_none() { + warn!("Being rate limited, backing off 30 seconds."); + state.lock().await.rate_limited = true; + rate_limit_backoff_started = Some(Instant::now()); + } + } } } // Check channel for content pages that could not be retrieved and retry them the user @@ -428,18 +471,20 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) { let retries_left = retry_map.get_mut(&url).unwrap(); if retries_left == &mut 0 { error!("Gave up on blob {}", url); - awaiting_content_types -= 1; - stats.blobs_error += 1; + state.lock().await.awaiting_content_types -= 1; + state.lock().await.stats.blobs_error += 1; } else { - *retries_left -= 1; - stats.blobs_retried += 1; + if rate_limit_backoff_started.is_none() { + *retries_left -= 1; + } + state.lock().await.stats.blobs_retried += 1; warn!("Retry blob {} {}", retries_left, url); config.blobs_tx.send((content_type, url)).await.unwrap(); } } else { retry_map.insert(url.clone(), config.retries - 1); - stats.blobs_retried += 1; + state.lock().await.stats.blobs_retried += 1; warn!("Retry blob {} {}", config.retries - 1, url); config.blobs_tx.send((content_type, url)).await.unwrap(); } @@ -447,28 +492,40 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) { // Check channel for content blobs that could not be retrieved and retry them the user // defined amount of times. If we can't in that amount of times then give up. if let Ok(Some(content)) = config.content_error_rx.try_next() { + state.lock().await.stats.blobs_retried += 1; if retry_map.contains_key(&content.url) { let retries_left = retry_map.get_mut(&content.url).unwrap(); if retries_left == &mut 0 { error!("Gave up on content {}", content.url); - awaiting_content_blobs -= 1; - stats.blobs_error += 1; + state.lock().await.awaiting_content_blobs -= 1; + state.lock().await.stats.blobs_error += 1; } else { - *retries_left -= 1; - stats.blobs_retried += 1; + if rate_limit_backoff_started.is_none() { + *retries_left -= 1; + } warn!("Retry content {} {}", retries_left, content.url); config.content_tx.send(content).await.unwrap(); - } } else { retry_map.insert(content.url.to_string(), config.retries - 1); - stats.blobs_retried += 1; + state.lock().await.stats.blobs_retried += 1; warn!("Retry content {} {}", config.retries - 1, content.url); config.content_tx.send(content).await.unwrap(); } } } // We send back stats after exiting the loop, signalling the end of the run. - config.stats_tx.send((stats.blobs_found, stats.blobs_successful, stats.blobs_retried, - stats.blobs_error)).await.unwrap(); + let stats = state.lock().await.stats.clone(); + sleep(Duration::from_secs(3)).await; + config.stats_tx.send(( + stats.blobs_found, + stats.blobs_successful, + stats.blobs_retried, + stats.blobs_error)).await.unwrap(); } + +async fn check_done(state: &mut Arc>) -> bool { + let types = state.lock().await.awaiting_content_types; + let blobs = state.lock().await.awaiting_content_blobs; + types == 0 && blobs == 0 +} \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index 2eddd50..117b010 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,14 +37,14 @@ impl Config { } for content_type in self.collect.content_types.get_content_type_strings() { runs.insert(content_type.clone(), vec!()); - let mut start_time = end_time - chrono::Duration::try_hours(hours_to_collect) + let mut start_time = end_time - chrono::Duration::try_hours(hours_to_collect) .unwrap(); while end_time - start_time > chrono::Duration::try_hours(24).unwrap() { let split_end_time = start_time + chrono::Duration::try_hours(24) .unwrap(); let formatted_start_time = start_time.format("%Y-%m-%dT%H:%M:%SZ").to_string(); - let formatted_end_time = end_time.format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let formatted_end_time = split_end_time.format("%Y-%m-%dT%H:%M:%SZ").to_string(); runs.get_mut(&content_type).unwrap().push((formatted_start_time, formatted_end_time)); start_time = split_end_time; } @@ -144,6 +144,7 @@ pub struct CollectSubConfig { #[serde(rename = "skipKnownLogs")] pub skip_known_logs: Option, pub filter: Option, + pub duplicate: Option, } #[derive(Deserialize, Copy, Clone, Debug)] pub struct ContentTypesSubConfig { diff --git a/src/data_structures.rs b/src/data_structures.rs index 2f3d5d7..1e6958b 100644 --- a/src/data_structures.rs +++ b/src/data_structures.rs @@ -12,7 +12,7 @@ pub type ArbitraryJson = HashMap; pub type JsonList = Vec; -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct Caches { pub general: JsonList, pub aad: JsonList, @@ -81,7 +81,7 @@ pub struct AuthResult { /// Representation of content we need to retrieve. ID, expiration and content type are passed to /// python along with the retrieved content. ID an expiration are needed for avoiding known logs, /// content type for categorization in outputs. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ContentToRetrieve { pub content_type: String, pub content_id: String, @@ -109,6 +109,7 @@ pub struct GetBlobConfig { pub blob_error_tx: Sender<(String, String)>, pub content_tx: Sender, pub threads: usize, + pub duplicate: usize } @@ -140,24 +141,23 @@ pub struct MessageLoopConfig { /// These stats to show to end-user. +#[derive(Default, Copy, Clone, Debug)] pub struct RunStatistics { pub blobs_found: usize, pub blobs_successful: usize, pub blobs_error: usize, pub blobs_retried: usize, } -impl RunStatistics { - pub fn new() -> RunStatistics { - RunStatistics { - blobs_found: 0, - blobs_successful: 0, - blobs_error: 0, - blobs_retried: 0 - } - } -} +#[derive(Default, Clone)] +pub struct RunState { + pub awaiting_content_types: usize, + pub awaiting_content_blobs: usize, + pub stats: RunStatistics, + pub rate_limited: bool, +} + #[derive(Parser, Debug, Clone)] #[command(version, about, long_about = None)] /// Collect audit logs from Office Management APIs. @@ -166,33 +166,24 @@ impl RunStatistics { /// collection options (check the examples folder in the repo). Then run the tool with below options. pub struct CliArgs { - #[arg(long)] + #[arg(long, help = "ID of tenant to retrieve logs for.")] pub tenant_id: String, - #[arg(long)] + #[arg(long, help = "Client ID of app registration used to retrieve logs.")] pub client_id: String, - #[arg(long)] + #[arg(long, help = "Secret key of app registration used to retrieve logs")] pub secret_key: String, - #[arg(short, long, default_value = "12345678-1234-1234-1234-123456789123")] + #[arg(short, long, default_value = "12345678-1234-1234-1234-123456789123", help = "Publisher ID, set to tenant-id if left empty.")] pub publisher_id: String, - #[arg(long)] + #[arg(long, help = "Path to mandatory config file.")] pub config: String, - #[arg(short, long, default_value = "")] - pub table_string: String, - - #[arg(short, long, default_value = "")] - pub blob_string: String, - - #[arg(short, long, default_value = "")] - pub sql_string: String, - - #[arg(short, long, default_value = "")] + #[arg(short, long, default_value = "", help = "Shared key for Azure Log Analytics Workspace.")] pub oms_key: String, - #[arg(short, long, required = false)] - pub interactive_subscriber: bool, + #[arg(short, long, required = false, help = "Interactive interface for (load) testing.")] + pub interactive: bool, } diff --git a/src/interactive_mode/interactive.rs b/src/interactive_mode/interactive.rs new file mode 100644 index 0000000..62d0e0f --- /dev/null +++ b/src/interactive_mode/interactive.rs @@ -0,0 +1,941 @@ +use std::cmp::max; +use std::sync::Arc; +use color_eyre::eyre::Result; +use crossterm::event::KeyCode::Char; +use crossterm::event::KeyCode; +use log::{error, Level, warn}; +use ratatui::{Frame, widgets::*}; +use ratatui::layout::{Constraint, Direction, Layout, Size}; +use ratatui::prelude::*; +use ratatui::style::Color; +use ratatui::widgets::ListItem; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use anyhow::Result as AnyHowResult; +use ratatui::style::palette::tailwind; +use reqwest::header::HeaderMap; +use tokio::sync::Mutex; +use tokio::time::Instant; +use tui_scrollview::{ScrollView, ScrollViewState}; +use crate::api_connection::ApiConnection; +use crate::collector::Collector; +use crate::config::Config; +use crate::data_structures::{CliArgs, RunState}; +use crate::interactive_mode::tui; +use crate::interactive_mode::tui::Action; + + +#[derive(Eq, PartialEq, Copy, Clone)] +enum SelectedBlock { + Commands, + Subscriptions, + Logs, + Results, +} + + +#[derive(Clone)] +struct State { + args: CliArgs, + config: Config, + logs: Vec<(String, Level)>, + results: Vec>, + action_tx: UnboundedSender, + interface_tx: UnboundedSender>, + should_quit: bool, + api_connected: bool, + general: bool, + exchange: bool, + sharepoint: bool, + aad: bool, + dlp: bool, + selected_block: SelectedBlock, + selected_list: usize, + selected_list_max: usize, + scroll_log: ScrollViewState, + table_result: TableState, + table_result_colum_start: usize, + found_blobs: usize, + successful_blobs: usize, + awaiting_blobs: usize, + error_blobs: usize, + retry_blobs: usize, + logs_retrieved: usize, + logs_retrieval_speeds: Vec<(f64, f64)>, + run_started: Option, + run_ended: Option, + run_progress: u16, + rate_limit: bool, +} +impl State { + pub fn new(args: CliArgs, + config: Config, + action_tx: UnboundedSender, + interface_tx: UnboundedSender> + ) -> Self { + Self { + args, + config, + action_tx, + interface_tx, + logs: Vec::new(), + results: Vec::new(), + should_quit: false, + api_connected: false, + general: false, + exchange: false, + sharepoint: false, + aad: false, + dlp: false, + selected_block: SelectedBlock::Commands, + selected_list: 0, + selected_list_max: 2, + scroll_log: ScrollViewState::default(), + table_result: TableState::default(), + table_result_colum_start: 0, + found_blobs: 0, + error_blobs: 0, + successful_blobs: 0, + awaiting_blobs: 0, + retry_blobs: 0, + logs_retrieved: 0, + logs_retrieval_speeds: Vec::new(), + run_started: None, + run_ended: None, + run_progress: 0, + rate_limit: false, + } + } +} + +pub async fn run(args: CliArgs, config: Config, mut log_rx: UnboundedReceiver<(String, Level)>) -> Result<()> { + let (action_tx, mut action_rx) = unbounded_channel(); + let (interface_tx, mut interface_rx) = unbounded_channel(); + let mut tui = tui::Tui::new()?.tick_rate(1.0).frame_rate(30.0); + tui.enter()?; + + let mut state = State::new(args, config, action_tx.clone(), interface_tx); + let api = Arc::new(Mutex::new( + ApiConnection { args: state.args.clone(), config: state.config.clone(), headers: HeaderMap::new() })); + + loop { + let e = tui.next().await.unwrap(); + match e { + tui::Event::Tick => action_tx.send(Action::Tick)?, + tui::Event::Render => action_tx.send(Action::Render)?, + tui::Event::Key(_) => { + let action = get_action(&state, e); + action_tx.send(action.clone())?; + } + _ => {} + }; + + while let Ok(log) = log_rx.try_recv() { + if state.logs.len() == 1000 { + state.logs.remove(0); + } + state.logs.push(log); + } + while let Ok(result) = interface_rx.try_recv() { + if state.results.len() == 1000 { + state.results.remove(0); + } + state.results.push(result); + } + while let Ok(action) = action_rx.try_recv() { + // application update + update(&mut state, action.clone(), api.clone()); + // render only when we receive Action::Render + if let Action::Render = action { + tui.draw(|f| { + ui(f, &mut state); + })?; + } + } + + // application exit + if state.should_quit { + break; + } + } + tui.exit()?; + + Ok(()) +} + +fn get_action(_state: &State, event: tui::Event) -> Action { + match event { + tui::Event::Error => Action::None, + tui::Event::Tick => Action::Tick, + tui::Event::Render => Action::Render, + tui::Event::Key(key) => { + match key.code { + Char('q') => Action::Quit, + Char('c') => Action::GoToCommand, + Char('l') => Action::GoToLogs, + Char('s') => Action::GoToSubscriptions, + Char('r') => Action::GoToResults, + KeyCode::Enter => Action::HandleEnter, + KeyCode::Up => Action::HandleUp, + KeyCode::Down => Action::HandleDown, + KeyCode::Left => Action::HandleLeft, + KeyCode::Right => Action::HandleRight, + KeyCode::PageUp => Action::ScrollPageUp, + KeyCode::PageDown => Action::ScrollPageDown, + _ => Action::None, + } + }, + _ => Action::None, + } +} + +fn update(state: &mut State, action: Action, api: Arc>) { + + match action { + Action::Quit => { + state.should_quit = true; + } + Action::GoToCommand => { + state.selected_block = SelectedBlock::Commands; + state.selected_list = 0; + state.selected_list_max = 2; + } + Action::GoToSubscriptions => { + state.selected_block = SelectedBlock::Subscriptions; + state.selected_list = 0; + state.selected_list_max = 4; + } + Action::GoToLogs => { + state.selected_block = SelectedBlock::Logs; + } + Action::GoToResults => { + state.selected_block = SelectedBlock::Results; + } + Action::HandleEnter => { + let new_state = state.clone(); + tokio::spawn(async move { + handle_enter(new_state, api).await; + }); + } + Action::HandleUp => { + if state.selected_block == SelectedBlock::Logs { + state.scroll_log.scroll_up() + } else if state.selected_block == SelectedBlock::Results{ + if let Some(index) = state.table_result.selected() { + if index > 0 { + state.table_result.select(Some(index - 1)); + } + } + } else { + state.selected_list = state.selected_list.saturating_sub(1); + } + } + Action::HandleDown => { + if state.selected_block == SelectedBlock::Logs { + state.scroll_log.scroll_down() + } else if state.selected_block == SelectedBlock::Results{ + if let Some(index) = state.table_result.selected() { + if index < 1000 { + state.table_result.select(Some(index + 1)); + } + } else { + state.table_result.select(Some(0)) + } + } else { + state.selected_list = if state.selected_list >= state.selected_list_max { + state.selected_list + } else { + state.selected_list + 1 + } + } + }, + Action::HandleLeft => { + if state.selected_block == SelectedBlock::Commands && state.selected_list == 2 { + let mut current = state.config.collect.duplicate.unwrap_or(1); + current = current.saturating_sub(max(1, current / 10)); + current = max(1, current); + state.config.collect.duplicate = Some(current); + } else if state.selected_block == SelectedBlock::Results { + if state.table_result_colum_start > 0 { + state.table_result_colum_start -= 1; + } + } + }, + Action::HandleRight => { + if state.selected_block == SelectedBlock::Commands && state.selected_list == 2 { + let current = state.config.collect.duplicate.unwrap_or(1); + let increase = max(1, current / 10); + state.config.collect.duplicate = Some(current + increase); + } else if state.selected_block == SelectedBlock::Results { + let mut max_col = state.results.first().unwrap_or(&Vec::new()).len(); + if max_col > 10 { + max_col -= 10; + } + if state.table_result_colum_start < max_col { + state.table_result_colum_start += 1; + } + } + }, + Action::ScrollPageUp => { + if state.selected_block == SelectedBlock::Logs { + state.scroll_log.scroll_page_up() + } + } + Action::ScrollPageDown => { + if state.selected_block == SelectedBlock::Logs { + state.scroll_log.scroll_page_down() + } + } + Action::UpdateFoundBlobs(found) => { + state.found_blobs = found; + } + Action::UpdateSuccessfulBlobs(found) => { + state.successful_blobs = found; + } + Action::UpdateRetryBlobs(found) => { + state.retry_blobs = found; + } + Action::UpdateErrorBlobs(found) => { + state.error_blobs = found; + } + Action::UpdateAwaitingBlobs(found) => { + state.awaiting_blobs = found; + } + Action::LogsRetrieved(found) => { + state.logs_retrieved = found; + } + Action::LogsRetrievedSpeed(found) => { + state.logs_retrieval_speeds.push(found); + } + Action::RunProgress(found) => { + state.run_progress = found; + } + Action::RunStarted => { + state.run_started = Some(Instant::now()); + state.logs_retrieval_speeds.clear(); + } + Action::RunEnded => { + state.run_ended = Some(Instant::now()); + } + Action::RateLimited => { + state.rate_limit = true; + } + Action::NotRateLimited => { + state.rate_limit = false; + } + Action::ConnectApi => { + state.api_connected = true; + }, + Action::DisconnectApi => { + state.api_connected = false; + }, + Action::EnableSubscriptionGeneral => { + state.general = true; + }, + Action::DisableSubscriptionGeneral => { + state.general = false; + }, + Action::EnableSubscriptionAad => { + state.aad = true; + }, + Action::DisableSubscriptionAad => { + state.aad = false; + }, + Action::EnableSubscriptionExchange => { + state.exchange = true; + }, + Action::DisableSubscriptionExchange => { + state.exchange = false; + }, + Action::EnableSubscriptionSharePoint => { + state.sharepoint = true; + }, + Action::DisableSubscriptionSharePoint => { + state.sharepoint = false; + }, + Action::EnableSubscriptionDlp => { + state.dlp = true; + }, + Action::DisableSubscriptionDlp => { + state.dlp = false; + }, + _ => (), // TODO + } +} + +fn ui(frame: &mut Frame, state: &mut State) { + + // Layouts + let vertical = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(10), + Constraint::Length(7), + Constraint::Length(1), + Constraint::Length(10), + Constraint::Length(1), + Constraint::Length(10), + ]) + .split(frame.size()); + + let horizontal_0 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(60), + Constraint::Length(60), + Constraint::Min(1), + ]) + .split(vertical[0]); + + let horizontal_1 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(60), + Constraint::Length(60), + Constraint::Min(1), + ]) + .split(vertical[1]); + + let horizontal_2 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(frame.size().width), + Constraint::Min(1), + ]) + .split(vertical[2]); + + let horizontal_3 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(frame.size().width), + Constraint::Min(1), + ]) + .split(vertical[3]); + + let horizontal_4 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(frame.size().width), + Constraint::Min(1), + ]) + .split(vertical[4]); + + let horizontal_5 = Layout::default() + .direction(Direction::Horizontal) + .constraints([ + Constraint::Length(frame.size().width), + Constraint::Min(1), + ]) + .split(vertical[5]); + + // Connection + let settings_block = Block::default() + .title(block::Title::from("Connection").alignment(Alignment::Center)) + .borders(Borders::ALL); + + let mut settings_list_items = Vec::::new(); + + settings_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Tenant ID: {}", state.args.tenant_id), Style::default().fg( + if state.args.tenant_id.is_empty() { Color::Red } else { Color::Green }), + )))); + settings_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Client ID: {}", state.args.client_id), Style::default().fg( + if state.args.client_id.is_empty() { Color::Red } else { Color::Green }), + )))); + + let secret_string = if state.args.secret_key.is_empty() { + "Secret Key:".to_string() + } else { + format!(" Secret Key: {}{}", + state.args.secret_key.clone().split_off(state.args.secret_key.len() - 5), + "*".repeat(state.args.secret_key.len() - 5)) + }; + settings_list_items.push(ListItem::new(Line::from(Span::styled( + secret_string, Style::default().fg( + if state.args.secret_key.is_empty() { Color::Red } else { Color::Green }), + )))); + settings_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Config: {}", state.args.config), Style::default().fg( + if state.args.config.is_empty() { Color::Red } else { Color::Green }), + )))); + + let settings_list = List::new(settings_list_items) + .block(settings_block.clone()); + frame.render_widget(settings_list, horizontal_0[0]); + + // Commands + let command_block_style = match state.selected_block { + SelectedBlock::Commands => Style::new().underlined(), + _ => Style::new(), + }; + let commands_block = Block::new() + .title_style(command_block_style) + .title(" Commands") + .title_alignment(Alignment::Center) + .borders(Borders::ALL); + + let mut commands_list_items = Vec::::new(); + + commands_list_items.push(ListItem::new(Line::from(Span::styled( + "Test API connection", Style::default().fg(Color::Magenta), + )))); + commands_list_items.push(ListItem::new(Line::from(Span::styled( + "Run Collector (using specified config)", Style::default().fg(Color::Magenta), + )))); + let duplicate = state.config.collect.duplicate.unwrap_or(1); + commands_list_items.push(ListItem::new(Line::from(Span::styled( + format!("< Load test ({}x) > (use arrow keys to increase load)", duplicate), Style::default().fg(Color::Magenta), + )))); + + let mut command_state = ListState::default().with_selected(Some(state.selected_list)); + if state.selected_block == SelectedBlock::Commands { + StatefulWidget::render( + List::new(commands_list_items) + .style(Style::new()) + .highlight_style(Style::new().on_yellow()) + .highlight_symbol(">>") + .block(commands_block), + horizontal_0[1], + frame.buffer_mut(), + &mut command_state, + ); + } else { + StatefulWidget::render( + List::new(commands_list_items) + .style(Style::new()) + .highlight_symbol(" ") + .block(commands_block), + horizontal_0[1], + frame.buffer_mut(), + &mut command_state, + ); + } + + // Speed chart + let chart_block = Block::default() + .title(block::Title::from("Performance").alignment(Alignment::Center)) + .borders(Borders::ALL); + let datasets = vec![ + Dataset::default() + .name("Logs per second") + .marker(Marker::Braille) + .graph_type(GraphType::Line) + .style(Style::default().magenta()) + .data(state.logs_retrieval_speeds.as_slice()), + ]; + + let x_axis = Axis::default() + .style(Style::default().white()) + .bounds([0.0, state.logs_retrieval_speeds.last().unwrap_or(&(10.0, 0.0)).0]) + .labels(vec![]); + + let top_speed = state.logs_retrieval_speeds + .iter() + .map(|(_, s)| *s as usize) + .max() + .unwrap_or(15); + let y_labels = vec!( + Span::from((top_speed / 3).to_string()), + Span::from(((top_speed / 3) * 2).to_string()), + Span::from(top_speed.to_string()) + ); + let y_axis = Axis::default() + .title("Logs per second".red()) + .style(Style::default().white()) + .bounds([0.0, top_speed as f64]) + .labels(y_labels); + + let chart = Chart::new(datasets) + .block(chart_block) + .x_axis(x_axis) + .y_axis(y_axis); + + frame.render_widget(chart, horizontal_0[2]); + + // Subscriptions + let subscription_block_style = match state.selected_block { + SelectedBlock::Subscriptions => Style::new().underlined(), + _ => Style::new(), + }; + let subscription_block = Block::new() + .title(" Feed subscriptions") + .title_alignment(Alignment::Center) + .title_style(subscription_block_style) + .borders(Borders::ALL); + + let mut subscription_list_items = Vec::::new(); + subscription_list_items.push(ListItem::new(Line::from(Span::styled( + format!("Audit.General active: {}", + if state.api_connected {state.general.to_string()} else { "Not connected".to_string() }), + Style::default().fg(color_from_bool(state.general)), + )))); + subscription_list_items.push(ListItem::new(Line::from(Span::styled( + format!("Audit.AzureActiveDirectory active: {}", + if state.api_connected {state.aad.to_string()} else { "Not connected".to_string() }), + Style::default().fg(color_from_bool(state.aad)), + )))); + subscription_list_items.push(ListItem::new(Line::from(Span::styled( + format!("Audit.Exchange active: {}", + if state.api_connected {state.exchange.to_string()} else { "Not connected".to_string() }), + Style::default().fg(color_from_bool(state.exchange)), + )))); + subscription_list_items.push(ListItem::new(Line::from(Span::styled( + format!("Audit.Sharepoint active: {}", + if state.api_connected {state.sharepoint.to_string()} else { "Not connected".to_string() }), + Style::default().fg(color_from_bool(state.sharepoint)), + )))); + subscription_list_items.push(ListItem::new(Line::from(Span::styled( + format!("DLP.All active: {}", + if state.api_connected {state.dlp.to_string()} else { "Not connected".to_string() }), + Style::default().fg(color_from_bool(state.dlp)), + )))); + let mut list_state = ListState::default().with_selected(Some(state.selected_list)); + if state.selected_block == SelectedBlock::Subscriptions { + StatefulWidget::render( + List::new(subscription_list_items) + .style(Style::new()) + .highlight_style(Style::new().on_yellow()) + .highlight_symbol(">>") + .block(subscription_block), + horizontal_1[0], + frame.buffer_mut(), + &mut list_state, + ); + } else { + StatefulWidget::render( + List::new(subscription_list_items) + .style(Style::new()) + .highlight_symbol(" ") + .block(subscription_block), + horizontal_1[0], + frame.buffer_mut(), + &mut list_state, + ); + } + + // Status + let status_block = Block::new() + .title("Blobs") + .title_alignment(Alignment::Center) + .title_style(Style::new()) + .borders(Borders::ALL); + + let highest = *[state.found_blobs, state.successful_blobs, state.retry_blobs, state.error_blobs] + .iter() + .max() + .unwrap(); + let bar = BarChart::default() + .block(Block::default().title("Run stats").borders(Borders::ALL)) + .bar_width(10) + .data(BarGroup::default().bars(&[Bar::default().value(state.found_blobs as u64).style(Style::default().fg(Color::Blue)).label(Line::from("Found"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.successful_blobs as u64).style(Style::default().fg(Color::Green)).label(Line::from("Retrieved"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.retry_blobs as u64).style(Style::default().fg(Color::Yellow)).label(Line::from("Retried"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.error_blobs as u64).style(Style::default().fg(Color::Red)).label(Line::from("Error"))])) + .max(max(highest as u64, 10)) + .block(status_block); + + frame.render_widget(bar, horizontal_1[1]); + + // Progress + let progress_block = Block::new() + .title("Progress") + .title_alignment(Alignment::Center) + .title_style(Style::new()) + .borders(Borders::ALL); + let mut progress_list_items = Vec::::new(); + + let (connect_string, color) = if state.api_connected { + (" API Connection: Connected".to_string(), Color::Green,) + } else { + (" API Connection: Disconnected".to_string(), Color::Red,) + }; + progress_list_items.push(ListItem::new(Line::from(Span::styled( + connect_string, Style::default().fg(color), + )))); + + if state.rate_limit { + progress_list_items.push(ListItem::new(Line::from(Span::styled( + " Being rate limited!", Style::default().fg(Color::Red).rapid_blink(), + )))); + } else { + progress_list_items.push(ListItem::new(Line::from(Span::styled( + " Not rate limited", Style::default().fg(Color::Green), + )))); + } + + let elapsed = if let Some(elapsed) = state.run_started { + + let end = state.run_ended.unwrap_or(Instant::now()); + let total = end.duration_since(elapsed).as_secs(); + let minutes = total / 60; + let seconds = total % 60; + format!("{}{}:{}{}", + if minutes < 10 { "0" } else { "" }, + minutes, + if seconds < 10 { "0" } else { "" }, + seconds, + ) + } else { + " Not started".to_string() + }; + progress_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Time elapsed: {}", elapsed), Style::default().fg(Color::LightBlue), + )))); + + progress_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Blobs remaining: {}", state.awaiting_blobs), Style::default().fg(Color::LightBlue), + )))); + + let progress_list = List::new(progress_list_items) + .style(Style::new()) + .highlight_symbol(" ") + .block(progress_block); + frame.render_widget(progress_list, horizontal_1[2]); + + // Logs + let mut logs_list_items = Vec::::new(); + for (log, level) in state.logs.iter() { + logs_list_items.push(ListItem::new(Line::from(Span::styled( + log, Style::default().fg(color_from_level(level)), + )))); + } + let list_wid = List::new(logs_list_items) + .style(Style::new()) + .highlight_symbol(" "); + let size = Size::new(1000, 1000); + let mut scroll_view = ScrollView::new(size); + let area = Rect::new(0, 0, 1000 , 1000); + scroll_view.render_widget(list_wid, area); + + let palette = tailwind::SLATE; + let (fg, bg) = if state.selected_block == SelectedBlock::Logs { + (palette.c900, Color::Yellow) + } else { + (palette.c900, palette.c300) + }; + let keys_fg = palette.c50; + let keys_bg = palette.c600; + let title = Line::from(vec![ + " Logs ".into(), + "| ↓ | ↑ | PageDown | PageUp | " + .fg(keys_fg) + .bg(keys_bg), + ]) + .style((fg, bg)).centered(); + frame.render_widget(title, horizontal_2[0]); + frame.render_stateful_widget(scroll_view, horizontal_3[0], &mut state.scroll_log); + + // Results + let mut results = state.results.clone(); + let mut header = if !results.is_empty() { results.remove(0) } else { Vec::new() }; + if header.len() > 10 { + header = header[state.table_result_colum_start..state.table_result_colum_start + 10].to_vec(); + } + let rows: Vec = results + .clone() + .into_iter() + .map(|mut x|{ + x = x[state.table_result_colum_start..state.table_result_colum_start + 10].to_vec(); + Row::new(x) + }) + .collect(); + let table = Table::default() + .rows(rows) + .highlight_style(Style::new().add_modifier(Modifier::REVERSED)) + .highlight_symbol(">>") + .header(Row::new(header) + .style(Style::new().bold().underlined()) + .bottom_margin(1), + ); + + + let palette = tailwind::SLATE; + let (fg, bg) = if state.selected_block == SelectedBlock::Results { + (palette.c900, Color::Yellow) + } else { + (palette.c900, palette.c300) + }; + let keys_fg = palette.c50; + let keys_bg = palette.c600; + let title = Line::from(vec![ + " Results ".into(), + " ↓ | ↑ | ← | → | " + .fg(keys_fg) + .bg(keys_bg), + ]) + .style((fg, bg)).centered(); + frame.render_widget(title, horizontal_4[0]); + frame.render_stateful_widget(table, horizontal_5[0], &mut state.table_result); + +} + +fn color_from_bool(val: bool) -> Color { + return if val { + Color::Green + } else { + Color::Red + } +} +fn color_from_level(level: &Level) -> Color { + match level { + &Level::Trace => Color::Magenta, + &Level::Debug => Color::White, + &Level::Info => Color::LightBlue, + &Level::Warn => Color::Yellow, + &Level::Error => Color::Red, + } +} +async fn handle_enter(state: State, api: Arc>) { + if let Err(e) = match state.selected_block { + SelectedBlock::Commands => handle_enter_command(state, api).await, + SelectedBlock::Subscriptions => handle_enter_subscription(state, api).await, + _ => Ok(()), + } { + error!("Error connecting to API: {}", e); + }; +} + +async fn handle_enter_command(state: State, api: Arc>) -> AnyHowResult<()>{ + match state.selected_list { + 0 => handle_enter_command_connect(state, api).await?, + 1 => handle_enter_command_run(state, false, api).await?, + 2 => handle_enter_command_run(state, true, api).await?, + _ => warn!("Invalid list choice"), + } + Ok(()) +} + +async fn handle_enter_command_connect(state: State, api: Arc>) -> AnyHowResult<()> { + + state.action_tx.send(Action::DisconnectApi).unwrap(); + if api.lock().await.headers.is_empty() { + api.lock().await.login().await?; + } + state.action_tx.send(Action::ConnectApi).unwrap(); + update_subscriptions(state, api).await?; + Ok(()) +} + +async fn handle_enter_command_run(state: State, + load_test: bool, + api: Arc>) + -> AnyHowResult<()> { + + let args = state.args.clone(); + let mut config = state.config.clone(); + if !load_test { + config.collect.duplicate = Some(1); + } else { + config.collect.skip_known_logs = Some(false); + } + let runs = config.get_needed_runs(); + let run_state = Arc::new(Mutex::new(RunState::default())); + + handle_enter_command_connect(state.clone(), api).await?; + let mut collector = Collector::new(args, + config, + runs, + run_state.clone(), + Some(state.interface_tx.clone())).await?; + state.action_tx.send(Action::RunStarted).unwrap(); + let mut elapsed_since_data_point = Instant::now(); + let run_start = elapsed_since_data_point.clone(); + let mut logs_retrieved: usize = 0; + let mut rate_limited = false; + loop { + let stats = run_state.lock().await.stats; + state.action_tx.send(Action::UpdateAwaitingBlobs(run_state.lock().await.awaiting_content_blobs)).unwrap(); + state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); + state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); + state.action_tx.send(Action::UpdateSuccessfulBlobs(stats.blobs_successful)).unwrap(); + state.action_tx.send(Action::UpdateErrorBlobs(stats.blobs_error)).unwrap(); + state.action_tx.send(Action::UpdateRetryBlobs(stats.blobs_retried)).unwrap(); + + if !rate_limited && run_state.lock().await.rate_limited { + rate_limited = true; + state.action_tx.send(Action::RateLimited).unwrap(); + } else if rate_limited && !run_state.lock().await.rate_limited { + rate_limited = false; + state.action_tx.send(Action::NotRateLimited).unwrap(); + } + + let progress = if stats.blobs_found > 0 { + ((stats.blobs_found - stats.blobs_successful) / stats.blobs_found) * 100 + } else { + 0 + }; + state.action_tx.send(Action::RunProgress(progress as u16)).unwrap(); + + logs_retrieved += collector.check_results().await; + let done = collector.check_stats().await; + if done { + logs_retrieved += collector.check_all_results().await; + state.action_tx.send(Action::RunEnded).unwrap(); + state.action_tx.send(Action::RunProgress(100)).unwrap(); + state.action_tx.send(Action::LogsRetrieved(logs_retrieved)).unwrap(); + state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); + state.action_tx.send(Action::UpdateSuccessfulBlobs(stats.blobs_successful)).unwrap(); + state.action_tx.send(Action::UpdateErrorBlobs(stats.blobs_error)).unwrap(); + state.action_tx.send(Action::UpdateRetryBlobs(stats.blobs_retried)).unwrap(); + break + } + state.action_tx.send(Action::LogsRetrieved(logs_retrieved)).unwrap(); + let since_last_data_point = elapsed_since_data_point.elapsed().as_secs(); + if since_last_data_point >= 1 { + let t = run_start.elapsed().as_secs() as f64; + let speed = logs_retrieved as f64 / t; + for _ in 0..since_last_data_point { + state.action_tx.send(Action::LogsRetrievedSpeed((t, speed))).unwrap(); + } + elapsed_since_data_point = Instant::now(); + } + } + Ok(()) +} + +async fn handle_enter_subscription(state: State, api: Arc>) -> AnyHowResult<()> { + + if api.lock().await.headers.is_empty() { + api.lock().await.login().await?; + } + match state.selected_list { + 0 => api.lock().await.set_subscription("Audit.General".to_string(), !state.general).await, + 1 => api.lock().await.set_subscription("Audit.AzureActiveDirectory".to_string(), !state.aad).await, + 2 => api.lock().await.set_subscription("Audit.Exchange".to_string(), !state.exchange).await, + 3 => api.lock().await.set_subscription("Audit.SharePoint".to_string(), !state.sharepoint).await, + 4 => api.lock().await.set_subscription("DLP.All".to_string(), !state.dlp).await, + _ => panic!(), + }?; + update_subscriptions(state, api).await?; + Ok(()) +} + +async fn update_subscriptions(state: State, api: Arc>) -> AnyHowResult<()> { + let subscriptions = api.lock().await.get_feeds().await?; + if subscriptions.contains(&"Audit.General".to_string()) { + state.action_tx.send(Action::EnableSubscriptionGeneral).unwrap() + } else { + state.action_tx.send(Action::DisableSubscriptionGeneral).unwrap() + } + if subscriptions.contains(&"Audit.AzureActiveDirectory".to_string()) { + state.action_tx.send(Action::EnableSubscriptionAad).unwrap() + } else { + state.action_tx.send(Action::DisableSubscriptionAad).unwrap() + } + if subscriptions.contains(&"Audit.Exchange".to_string()) { + state.action_tx.send(Action::EnableSubscriptionExchange).unwrap() + } else { + state.action_tx.send(Action::DisableSubscriptionExchange).unwrap() + } + if subscriptions.contains(&"Audit.SharePoint".to_string()) { + state.action_tx.send(Action::EnableSubscriptionSharePoint).unwrap() + } else { + state.action_tx.send(Action::DisableSubscriptionSharePoint).unwrap() + } + if subscriptions.contains(&"DLP.All".to_string()) { + state.action_tx.send(Action::EnableSubscriptionDlp).unwrap() + } else { + state.action_tx.send(Action::DisableSubscriptionDlp).unwrap() + } + Ok(()) +} diff --git a/src/interactive_mode/mod.rs b/src/interactive_mode/mod.rs new file mode 100644 index 0000000..08fb11b --- /dev/null +++ b/src/interactive_mode/mod.rs @@ -0,0 +1,2 @@ +pub mod interactive; +mod tui; \ No newline at end of file diff --git a/src/interactive_mode/tui.rs b/src/interactive_mode/tui.rs new file mode 100644 index 0000000..d31254a --- /dev/null +++ b/src/interactive_mode/tui.rs @@ -0,0 +1,251 @@ +use std::{ + ops::{Deref, DerefMut}, + time::Duration, +}; + +use color_eyre::eyre::Result; +use crossterm::{ + cursor, + event::{ + DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture, Event as CrosstermEvent, + KeyEvent, KeyEventKind, MouseEvent, + }, + terminal::{EnterAlternateScreen, LeaveAlternateScreen}, +}; +use futures::{FutureExt, StreamExt}; +use ratatui::backend::CrosstermBackend as Backend; +use tokio::{ + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; + +#[derive(Clone, Debug)] +pub enum Event { + Init, + Error, + Tick, + Render, + FocusGained, + FocusLost, + Paste(String), + Key(KeyEvent), + Mouse(MouseEvent), + Resize(u16, u16), +} + +#[derive(Copy, Clone)] +pub enum Action { + Tick, + GoToCommand, + GoToSubscriptions, + GoToLogs, + GoToResults, + HandleUp, + HandleDown, + HandleLeft, + HandleRight, + HandleEnter, + EnableSubscriptionGeneral, + DisableSubscriptionGeneral, + EnableSubscriptionAad, + DisableSubscriptionAad, + EnableSubscriptionSharePoint, + DisableSubscriptionSharePoint, + EnableSubscriptionExchange, + DisableSubscriptionExchange, + EnableSubscriptionDlp, + DisableSubscriptionDlp, + DisconnectApi, + ConnectApi, + ScrollPageUp, + ScrollPageDown, + UpdateFoundBlobs(usize), + UpdateAwaitingBlobs(usize), + UpdateSuccessfulBlobs(usize), + UpdateErrorBlobs(usize), + UpdateRetryBlobs(usize), + LogsRetrieved(usize), + LogsRetrievedSpeed((f64, f64)), + RunProgress(u16), + RunStarted, + RunEnded, + RateLimited, + NotRateLimited, + Quit, + Render, + None, +} + +pub struct Tui { + pub terminal: ratatui::Terminal>, + pub task: JoinHandle<()>, + pub cancellation_token: CancellationToken, + pub event_rx: UnboundedReceiver, + pub event_tx: UnboundedSender, + pub frame_rate: f64, + pub tick_rate: f64, + pub mouse: bool, + pub paste: bool, +} + +impl Tui { + pub fn new() -> Result { + let tick_rate = 4.0; + let frame_rate = 60.0; + let terminal = ratatui::Terminal::new(Backend::new(std::io::stderr()))?; + let (event_tx, event_rx) = mpsc::unbounded_channel(); + let cancellation_token = CancellationToken::new(); + let task = tokio::spawn(async {}); + let mouse = false; + let paste = false; + Ok(Self { terminal, task, cancellation_token, event_rx, event_tx, frame_rate, tick_rate, mouse, paste }) + } + + pub fn tick_rate(mut self, tick_rate: f64) -> Self { + self.tick_rate = tick_rate; + self + } + + pub fn frame_rate(mut self, frame_rate: f64) -> Self { + self.frame_rate = frame_rate; + self + } + + pub fn start(&mut self) { + let tick_delay = Duration::from_secs_f64(1.0 / self.tick_rate); + let render_delay = Duration::from_secs_f64(1.0 / self.frame_rate); + self.cancel(); + self.cancellation_token = CancellationToken::new(); + let _cancellation_token = self.cancellation_token.clone(); + let _event_tx = self.event_tx.clone(); + self.task = tokio::spawn(async move { + let mut reader = crossterm::event::EventStream::new(); + let mut tick_interval = tokio::time::interval(tick_delay); + let mut render_interval = tokio::time::interval(render_delay); + _event_tx.send(Event::Init).unwrap(); + loop { + let tick_delay = tick_interval.tick(); + let render_delay = render_interval.tick(); + let crossterm_event = reader.next().fuse(); + tokio::select! { + _ = _cancellation_token.cancelled() => { + break; + } + maybe_event = crossterm_event => { + match maybe_event { + Some(Ok(evt)) => { + match evt { + CrosstermEvent::Key(key) => { + if key.kind == KeyEventKind::Press { + _event_tx.send(Event::Key(key)).unwrap(); + } + }, + CrosstermEvent::Mouse(mouse) => { + _event_tx.send(Event::Mouse(mouse)).unwrap(); + }, + CrosstermEvent::Resize(x, y) => { + _event_tx.send(Event::Resize(x, y)).unwrap(); + }, + CrosstermEvent::FocusLost => { + _event_tx.send(Event::FocusLost).unwrap(); + }, + CrosstermEvent::FocusGained => { + _event_tx.send(Event::FocusGained).unwrap(); + }, + CrosstermEvent::Paste(s) => { + _event_tx.send(Event::Paste(s)).unwrap(); + }, + } + } + Some(Err(_)) => { + _event_tx.send(Event::Error).unwrap(); + } + None => {}, + } + }, + _ = tick_delay => { + _event_tx.send(Event::Tick).unwrap(); + }, + _ = render_delay => { + _event_tx.send(Event::Render).unwrap(); + }, + } + } + }); + } + + pub fn stop(&self) -> Result<()> { + self.cancel(); + let mut counter = 0; + while !self.task.is_finished() { + std::thread::sleep(Duration::from_millis(1)); + counter += 1; + if counter > 50 { + self.task.abort(); + } + if counter > 100 { + log::error!("Failed to abort task in 100 milliseconds for unknown reason"); + break; + } + } + Ok(()) + } + + pub fn enter(&mut self) -> Result<()> { + crossterm::terminal::enable_raw_mode()?; + crossterm::execute!(std::io::stderr(), EnterAlternateScreen, cursor::Hide)?; + if self.mouse { + crossterm::execute!(std::io::stderr(), EnableMouseCapture)?; + } + if self.paste { + crossterm::execute!(std::io::stderr(), EnableBracketedPaste)?; + } + self.start(); + Ok(()) + } + + pub fn exit(&mut self) -> Result<()> { + self.stop()?; + if crossterm::terminal::is_raw_mode_enabled()? { + self.flush()?; + if self.paste { + crossterm::execute!(std::io::stderr(), DisableBracketedPaste)?; + } + if self.mouse { + crossterm::execute!(std::io::stderr(), DisableMouseCapture)?; + } + crossterm::execute!(std::io::stderr(), LeaveAlternateScreen, cursor::Show)?; + crossterm::terminal::disable_raw_mode()?; + } + Ok(()) + } + + pub fn cancel(&self) { + self.cancellation_token.cancel(); + } + + pub async fn next(&mut self) -> Option { + self.event_rx.recv().await + } +} + +impl Deref for Tui { + type Target = ratatui::Terminal>; + + fn deref(&self) -> &Self::Target { + &self.terminal + } +} + +impl DerefMut for Tui { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.terminal + } +} + +impl Drop for Tui { + fn drop(&mut self) { + self.exit().unwrap(); + } +} \ No newline at end of file diff --git a/src/interfaces/file_interface.rs b/src/interfaces/file_interface.rs index bbb9cff..4966b79 100644 --- a/src/interfaces/file_interface.rs +++ b/src/interfaces/file_interface.rs @@ -70,8 +70,11 @@ impl FileInterface { columns.append(&mut get_all_columns(content_type)); } + let path = &self.config.output.file.as_ref().unwrap().path; let mut wrt = - Writer::from_path(&self.config.output.file.as_ref().unwrap().path).unwrap(); + Writer::from_path(path).unwrap_or_else( + |e| panic!("Error in CSV interface: Could not write to path '{}': {}", path, e) + ); wrt.write_record(&columns).unwrap(); for logs in all_logs.iter_mut() { for log in logs.iter_mut() { @@ -90,7 +93,9 @@ impl FileInterface { } let columns = get_all_columns(logs); let path = self.paths.get(&content_type).unwrap(); - let mut wrt = Writer::from_path(path).unwrap(); + let mut wrt = Writer::from_path(path).unwrap_or_else( + |e| panic!("Error in CSV interface: Could not write to path '{}': {}", path, e) + ); wrt.write_record(&columns).unwrap(); for log in logs { @@ -115,7 +120,7 @@ impl Interface for FileInterface { /// Get all column names in a heterogeneous collection of logs. -fn get_all_columns(logs: &[ArbitraryJson]) -> Vec { +pub fn get_all_columns(logs: &[ArbitraryJson]) -> Vec { let mut columns: Vec = Vec::new(); for log in logs.iter() { @@ -130,7 +135,7 @@ fn get_all_columns(logs: &[ArbitraryJson]) -> Vec { /// Due to heterogeneous logs not all logs have all columns. Fill missing columns of /// a log with an empty string. -fn fill_log(log: &ArbitraryJson, columns: &Vec) -> Vec { +pub fn fill_log(log: &ArbitraryJson, columns: &Vec) -> Vec { let mut new_log= Vec::new(); for col in columns { if !log.contains_key(col) { diff --git a/src/interfaces/interactive_interface.rs b/src/interfaces/interactive_interface.rs new file mode 100644 index 0000000..c65dc0c --- /dev/null +++ b/src/interfaces/interactive_interface.rs @@ -0,0 +1,40 @@ +use async_trait::async_trait; +use tokio::sync::mpsc::UnboundedSender; +use crate::data_structures::{Caches}; +use crate::interfaces::interface::Interface; + +pub struct InteractiveInterface { + tx_log: UnboundedSender>, +} + +impl InteractiveInterface { + + pub fn new(tx_log: UnboundedSender>) -> Self { + + let interface = InteractiveInterface { + tx_log, + }; + interface + } +} + +#[async_trait] +impl Interface for InteractiveInterface { + + async fn send_logs(&mut self, mut logs: Caches) { + + let mut all_logs = logs.get_all(); + let mut columns: Vec = Vec::new(); + for content_type in all_logs.iter_mut() { + columns.append(&mut crate::interfaces::file_interface::get_all_columns(content_type)); + } + self.tx_log.send(columns.clone()).unwrap(); + + for logs in all_logs.iter_mut() { + for log in logs.iter_mut() { + let new_log = crate::interfaces::file_interface::fill_log(log, &columns); + self.tx_log.send(new_log).unwrap(); + } + } + } +} diff --git a/src/interfaces/mod.rs b/src/interfaces/mod.rs index d569f14..9fbc09a 100644 --- a/src/interfaces/mod.rs +++ b/src/interfaces/mod.rs @@ -3,3 +3,4 @@ pub(crate) mod fluentd_interface; pub(crate) mod graylog_interface; pub(crate) mod azure_oms_interface; pub mod interface; +pub mod interactive_interface; diff --git a/src/main.rs b/src/main.rs index 29e4d02..bc9e7bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,19 @@ +use std::sync::Arc; use clap::Parser; use crate::collector::Collector; use crate::config::Config; -use log::LevelFilter; +use log::{error, Level, LevelFilter, Log, Metadata, Record}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::Mutex; +use crate::data_structures::RunState; +use crate::interactive_mode::interactive; mod collector; mod api_connection; mod data_structures; mod config; mod interfaces; +mod interactive_mode; #[tokio::main] @@ -15,14 +21,27 @@ async fn main() { let args = data_structures::CliArgs::parse(); let config = Config::new(args.config.clone()); - init_logging(&config); + let (log_tx, log_rx) = unbounded_channel(); - let runs = config.get_needed_runs(); - let mut collector = Collector::new(args, config, runs).await; - collector.monitor().await; + if args.interactive { + init_interactive_logging(&config, log_tx); + interactive::run(args, config, log_rx).await.unwrap(); + } else { + init_non_interactive_logging(&config); + let state = RunState::default(); + let wrapped_state = Arc::new(Mutex::new(state)); + let runs = config.get_needed_runs(); + match Collector::new(args, config, runs, wrapped_state.clone(), None).await { + Ok(mut collector) => collector.monitor().await, + Err(e) => { + error!("Could not start collector: {}", e); + panic!("Could not start collector: {}", e); + } + } + } } -fn init_logging(config: &Config) { +fn init_non_interactive_logging(config: &Config) { let (path, level) = if let Some(log_config) = &config.log { let level = if log_config.debug { LevelFilter::Debug } else { LevelFilter::Info }; @@ -30,9 +49,49 @@ fn init_logging(config: &Config) { } else { ("".to_string(), LevelFilter::Info) }; + if !path.is_empty() { simple_logging::log_to_file(path, level).unwrap(); } else { simple_logging::log_to_stderr(level); } } + +fn init_interactive_logging(config: &Config, log_tx: UnboundedSender<(String, Level)>) { + + let level = if let Some(log_config) = &config.log { + if log_config.debug { LevelFilter::Debug } else { LevelFilter::Info } + } else { + LevelFilter::Info + }; + log::set_max_level(level); + log::set_boxed_logger(InteractiveLogger::new(log_tx)).unwrap(); +} + + +pub struct InteractiveLogger { + log_tx: UnboundedSender<(String, Level)>, + +} +impl InteractiveLogger { + pub fn new(log_tx: UnboundedSender<(String, Level)>) -> Box { + Box::new(InteractiveLogger { log_tx }) + } +} +impl Log for InteractiveLogger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= Level::Info + } + fn log(&self, record: &Record) { + + let date = chrono::Utc::now().to_string(); + let msg = format!("[{}] {}:{} -- {}", + date, + record.level(), + record.target(), + record.args()); + self.log_tx.send((msg, record.level())).unwrap() + } + fn flush(&self) {} +} +