Skip to content

Commit d2aa7ca

Browse files
authored
Merge pull request #64 from ddbnl/master
release logging
2 parents 42c21bf + 25068d8 commit d2aa7ca

File tree

8 files changed

+203
-34
lines changed

8 files changed

+203
-34
lines changed

Cargo.lock

+59-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55

66

77
[dependencies]
8+
anyhow = "1.0.81"
89
log = "0.4.16"
910
simple_logger = "4.3.3"
1011
chrono = "0.4.19"
@@ -23,3 +24,4 @@ base64 = "0.22.0"
2324
hmac = "0.12.1"
2425
sha2 = "0.10.8"
2526
async-trait = "0.1.77"
27+
simple-logging = "2.0.2"

Release/Linux/OfficeAuditLogCollector

100755100644
-9.18 MB
Binary file not shown.
-4.85 MB
Binary file not shown.

src/api_connection.rs

+89-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::collections::HashMap;
22
use reqwest;
3-
use log::{debug, warn, error};
3+
use log::{debug, warn, error, info};
44
use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap};
55
use tokio;
66
use serde_json;
@@ -9,17 +9,19 @@ use futures::channel::mpsc::{Receiver, Sender};
99
use crate::config::Config;
1010
use crate::data_structures::{JsonList, StatusMessage, GetBlobConfig, GetContentConfig, AuthResult,
1111
ContentToRetrieve, CliArgs};
12+
use anyhow::Result;
13+
use serde_json::Value;
1214

1315

1416
/// Return a logged in API connection object. Use the Headers value to make API requests.
15-
pub fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection {
17+
pub async fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection {
1618

1719
let mut api = ApiConnection {
1820
args,
1921
config,
2022
headers: HeaderMap::new(),
2123
};
22-
api.login();
24+
api.login().await;
2325
api
2426
}
2527

@@ -34,7 +36,8 @@ pub struct ApiConnection {
3436
impl ApiConnection {
3537
/// Use tenant_id, client_id and secret_key to request a bearer token and store it in
3638
/// our headers. Must be called once before requesting any content.
37-
fn login(&mut self) {
39+
async fn login(&mut self) {
40+
info!("Logging in to Office Management API.");
3841
let auth_url = format!("https://login.microsoftonline.com/{}/oauth2/token",
3942
self.args.tenant_id.to_string());
4043

@@ -48,43 +51,113 @@ impl ApiConnection {
4851

4952
self.headers.insert(CONTENT_TYPE, "application/x-www-form-urlencoded".parse().unwrap());
5053

51-
let login_client = reqwest::blocking::Client::new();
52-
let json: AuthResult = login_client
54+
let login_client = reqwest::Client::new();
55+
let result = login_client
5356
.post(auth_url)
5457
.headers(self.headers.clone())
5558
.form(&params)
5659
.send()
57-
.unwrap_or_else(|e| panic!("Could not send API login request: {}", e))
58-
.json()
59-
.unwrap_or_else(|e| panic!("Could not parse API login reply: {}", e));
60+
.await;
61+
let response = match result {
62+
Ok(response) => response,
63+
Err(e) => {
64+
let msg = format!("Could not send API login request: {}", e);
65+
error!("{}", msg);
66+
panic!("{}", msg);
67+
}
68+
};
69+
if !response.status().is_success() {
70+
let text = match response.text().await {
71+
Ok(text) => text,
72+
Err(e) => {
73+
let msg = format!("Received error response to API login, but could not parse response: {}", e);
74+
error!("{}", msg);
75+
panic!("{}", msg);
76+
}
77+
};
78+
let msg = format!("Received error response to API login: {}", text);
79+
error!("{}", msg);
80+
panic!("{}", msg);
81+
}
82+
let json = match response.json::<AuthResult>().await {
83+
Ok(json) => json,
84+
Err(e) => {
85+
let msg = format!("Could not parse API login reply: {}", e);
86+
error!("{}", msg);
87+
panic!("{}", msg);
88+
}
89+
};
6090

6191
let token = format!("bearer {}", json.access_token);
6292
self.headers.insert(AUTHORIZATION, token.parse().unwrap());
93+
info!("Successfully logged in to Office Management API.")
6394
}
6495

6596
fn get_base_url(&self) -> String {
6697
format!("https://manage.office.com/api/v1.0/{}/activity/feed", self.args.tenant_id)
6798
}
6899

69-
pub fn subscribe_to_feeds(&self) {
100+
pub async fn subscribe_to_feeds(&self) -> Result<()> {
70101

71-
let content_types = self.config.collect.content_types.get_content_type_strings();
102+
info!("Subscribing to audit feeds.");
103+
let mut content_types = self.config.collect.content_types.get_content_type_strings();
72104

73-
let client = reqwest::blocking::Client::new();
105+
let client = reqwest::Client::new();
106+
info!("Getting current audit feed subscriptions.");
107+
let url = format!("{}/subscriptions/list", self.get_base_url());
108+
let result: Vec<HashMap<String, Value>> = client
109+
.get(url)
110+
.headers(self.headers.clone())
111+
.header("content-length", 0)
112+
.send()
113+
.await?
114+
.json()
115+
.await?;
116+
for subscription in result {
117+
let status = subscription
118+
.get("status")
119+
.expect("No status in JSON")
120+
.as_str()
121+
.unwrap()
122+
.to_string()
123+
.to_lowercase();
124+
if status == "enabled" {
125+
let content_type = subscription
126+
.get("contentType")
127+
.expect("No contentType in JSON")
128+
.as_str()
129+
.unwrap()
130+
.to_string()
131+
.to_lowercase();
132+
if let Some(i) = content_types
133+
.iter()
134+
.position(|x| x.to_lowercase() == content_type) {
135+
info!("Already subscribed to feed {}", content_type);
136+
content_types.remove(i);
137+
}
138+
}
139+
}
74140
for content_type in content_types {
75141
let url = format!("{}/subscriptions/start?contentType={}",
76142
self.get_base_url(),
77143
content_type
78144
);
79-
client
145+
debug!("Subscribing to {} feed.", content_type);
146+
let response = client
80147
.post(url)
81148
.headers(self.headers.clone())
82149
.header("content-length", 0)
83150
.send()
84-
.unwrap_or_else(
85-
|e| panic!("Error setting feed subscription status {}", e)
86-
);
151+
.await?;
152+
if !response.status().is_success() {
153+
let text = response.text().await?;
154+
let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text);
155+
error!("{}", msg);
156+
panic!("{}", msg);
157+
}
87158
}
159+
info!("All audit feeds subscriptions exist.");
160+
Ok(())
88161
}
89162

90163

src/collector.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub struct Collector {
4343

4444
impl Collector {
4545

46-
pub fn new(args: CliArgs, config: Config, runs: HashMap<String, Vec<(String, String)>>) -> Collector {
46+
pub async fn new(args: CliArgs, config: Config, runs: HashMap<String, Vec<(String, String)>>) -> Collector {
4747

4848
// Initialize interfaces
4949
let mut interfaces: Vec<Box<dyn Interface>> = Vec::new();
@@ -63,8 +63,12 @@ impl Collector {
6363
// Initialize collector threads
6464
let api = api_connection::get_api_connection(
6565
args.clone(), config.clone()
66-
);
67-
api.subscribe_to_feeds();
66+
).await;
67+
if let Err(e) = api.subscribe_to_feeds().await {
68+
let msg = format!("Error subscribing to audit feeds: {}", e);
69+
error!("{}", msg);
70+
panic!("{}", msg);
71+
}
6872

6973
let known_blobs = config.load_known_blobs();
7074
let (result_rx, stats_rx, kill_tx) =

src/interfaces/azure_oms_interface.rs

+27-13
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use base64::prelude::BASE64_STANDARD;
44
use chrono::Utc;
55
use futures::{stream, StreamExt};
66
use hmac::{Hmac, Mac};
7-
use log::warn;
7+
use log::{error, info, warn};
88
use sha2::Sha256;
99
use crate::config::Config;
1010
use crate::data_structures::Caches;
@@ -56,7 +56,7 @@ impl Interface for OmsInterface {
5656
async fn send_logs(&mut self, logs: Caches) {
5757
let client = reqwest::Client::new();
5858

59-
println!("SEND");
59+
info!("Sending logs to OMS interface.");
6060
let mut requests = Vec::new();
6161
for (content_type, content_logs) in logs.get_all_types() {
6262
for log in content_logs.iter() {
@@ -75,33 +75,47 @@ impl Interface for OmsInterface {
7575
}
7676
}
7777

78+
let resource = "/api/logs";
79+
let uri = format!("https://{}.ods.opinsights.azure.com{}?api-version=2016-04-01",
80+
self.config.output.oms.as_ref().unwrap().workspace_id, resource);
81+
82+
info!("URL for OMS calls will be: {}", uri);
7883
let calls = stream::iter(requests)
7984
.map(|(body, table_name, time_value, content_length)| {
8085
let client = client.clone();
81-
let rfc1123date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT");
86+
let uri = uri.clone();
8287
let method = "POST".to_string();
8388
let content_type = "application/json".to_string();
84-
let resource = "/api/logs".to_string();
85-
let signature = self.build_signature(rfc1123date.to_string(), content_length,
86-
method.clone(), content_type.to_string(),
89+
let rfc1123date = Utc::now().format("%a, %d %b %Y %H:%M:%S GMT").to_string();
90+
let signature = self.build_signature(rfc1123date.clone(), content_length,
91+
method.clone(), content_type.clone(),
8792
resource.to_string());
8893

89-
90-
let uri = format!("https://{}.ods.opinsights.azure.com{}?api-version=2016-04-01",
91-
self.config.output.oms.as_ref().unwrap().workspace_id, resource);
9294
tokio::spawn(async move {
93-
let resp = client
95+
let result = client
9496
.post(uri)
9597
.header("content-type", "application/json")
9698
.header("content-length", content_length)
9799
.header("Authorization", signature)
98100
.header("Log-Type", table_name)
99-
.header("x-ms-date", rfc1123date.to_string())
101+
.header("x-ms-date", rfc1123date.clone())
100102
.header("time-generated-field", time_value)
101103
.body(body)
102104
.send()
103-
.await.unwrap();
104-
resp.bytes().await
105+
.await;
106+
match result {
107+
Ok(response) => {
108+
if !response.status().is_success() {
109+
match response.text().await {
110+
Ok(text) => error!("Error response after sending log to OMS: {}", text),
111+
Err(e) => error!("Error response after sending log to OMS, but could not parse response: {}", e),
112+
}
113+
}
114+
},
115+
Err(e) => {
116+
error!("Error send log to OMS: {}", e);
117+
}
118+
}
105119
})
106120
})
107121
.buffer_unordered(10);

0 commit comments

Comments
 (0)