From f16e6e2ac63b2be326feaf48267a6c355b79874c Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 7 Aug 2023 12:41:09 -0400 Subject: [PATCH 1/5] chore(azure_monitor_logs sink): refactor to new sink style --- .../config.rs} | 640 ++++++------------ src/sinks/azure_monitor_logs/mod.rs | 5 + src/sinks/azure_monitor_logs/service.rs | 249 +++++++ src/sinks/azure_monitor_logs/sink.rs | 214 ++++++ 4 files changed, 680 insertions(+), 428 deletions(-) rename src/sinks/{azure_monitor_logs.rs => azure_monitor_logs/config.rs} (50%) create mode 100644 src/sinks/azure_monitor_logs/mod.rs create mode 100644 src/sinks/azure_monitor_logs/service.rs create mode 100644 src/sinks/azure_monitor_logs/sink.rs diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs/config.rs similarity index 50% rename from src/sinks/azure_monitor_logs.rs rename to src/sinks/azure_monitor_logs/config.rs index ba8435499f359..a1bfab3400e81 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -1,39 +1,27 @@ -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{ - header, - header::{HeaderMap, HeaderName, HeaderValue}, - Request, StatusCode, Uri, -}; -use hyper::Body; -use lookup::lookup_v2::OptionalValuePath; -use lookup::{OwnedValuePath, PathPrefix}; -use once_cell::sync::Lazy; -use openssl::{base64, hash, pkey, sign}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_json::Value as JsonValue; +use lookup::{lookup_v2::OptionalValuePath, OwnedValuePath}; +use openssl::{base64, pkey}; + use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; -use vector_core::schema; +use vector_core::{config::log_schema, schema}; use vrl::value::Kind; use crate::{ - codecs::Transformer, - config::{log_schema, AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - http::HttpClient, + http::{get_http_scheme_from_uri, HttpClient}, sinks::{ - util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, - TowerRequestConfig, - }, - Healthcheck, VectorSink, + prelude::*, + util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde}, }, - tls::{TlsConfig, TlsSettings}, }; +use super::{ + service::{AzureMonitorLogsResponse, AzureMonitorLogsService}, + sink::AzureMonitorLogsSink, +}; + +/// Max number of bytes in request body +const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; + fn default_host() -> String { "ods.opinsights.azure.com".into() } @@ -147,273 +135,93 @@ impl Default for AzureMonitorLogsConfig { } } -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - -static LOG_TYPE_REGEX: Lazy = Lazy::new(|| Regex::new(r"^\w+$").unwrap()); -static LOG_TYPE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static("log-type")); -static X_MS_DATE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static(X_MS_DATE)); -static X_MS_AZURE_RESOURCE_HEADER: Lazy = - Lazy::new(|| HeaderName::from_static("x-ms-azureresourceid")); -static TIME_GENERATED_FIELD_HEADER: Lazy = - Lazy::new(|| HeaderName::from_static("time-generated-field")); -static CONTENT_TYPE_VALUE: Lazy = Lazy::new(|| HeaderValue::from_static(CONTENT_TYPE)); +impl AzureMonitorLogsConfig { + fn build_shared_key(&self) -> crate::Result> { + if self.shared_key.inner().is_empty() { + return Err("shared_key cannot be an empty string".into()); + } + let shared_key_bytes = base64::decode_block(self.shared_key.inner())?; + let shared_key = pkey::PKey::hmac(&shared_key_bytes)?; + Ok(shared_key) + } -impl_generate_config_from_default!(AzureMonitorLogsConfig); + fn get_time_generated_key(&self) -> Option { + self.time_generated_key + .clone() + .and_then(|k| k.path) + .or_else(|| log_schema().timestamp_key().cloned()) + } -/// Max number of bytes in request body -const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; -/// API endpoint for submitting logs -const RESOURCE: &str = "/api/logs"; -/// JSON content type of logs -const CONTENT_TYPE: &str = "application/json"; -/// Custom header used for signing logs -const X_MS_DATE: &str = "x-ms-date"; -/// Shared key prefix -const SHARED_KEY: &str = "SharedKey"; -/// API version -const API_VERSION: &str = "2016-04-01"; + async fn build_inner( + &self, + cx: SinkContext, + endpoint: UriSerde, + ) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = endpoint.with_default_parts().uri; + let protocol = get_http_scheme_from_uri(&endpoint).to_string(); -#[async_trait::async_trait] -#[typetag::serde(name = "azure_monitor_logs")] -impl SinkConfig for AzureMonitorLogsConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let batch_settings = self .batch .validate()? .limit_max_bytes(MAX_BATCH_SIZE)? - .into_batch_settings()?; + .into_batcher_settings()?; - let time_generated_key = self.time_generated_key.clone().and_then(|k| k.path); + let shared_key = self.build_shared_key()?; + let time_generated_key = self.get_time_generated_key(); let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; - let sink = AzureMonitorLogsSink::new(self, time_generated_key)?; - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - - let healthcheck = healthcheck(sink.clone(), client.clone()).boxed(); - - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(batch_settings.size), - request_settings, - batch_settings.timeout, + let service = AzureMonitorLogsService::new( client, - ) - .sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error)); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirements = - schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirements) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -#[derive(Clone)] -struct AzureMonitorLogsSink { - uri: Uri, - customer_id: String, - time_generated_key: Option, - transformer: Transformer, - shared_key: pkey::PKey, - default_headers: HeaderMap, -} - -struct AzureMonitorLogsEventEncoder { - transformer: Transformer, - time_generated_key: Option, -} - -impl HttpEventEncoder for AzureMonitorLogsEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - - // it seems like Azure Monitor doesn't support full 9-digit nanosecond precision - // adjust the timestamp format accordingly, keeping only milliseconds - let mut log = event.into_log(); - - // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or - // Metadata, the following `insert()` ensures it's encoded in the request. - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { - ts - } else { - chrono::Utc::now() - }; - - if let Some(timestamp_key) = &self.time_generated_key { - log.insert( - (PathPrefix::Event, timestamp_key), - JsonValue::String(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)), - ); - } - - let entry = serde_json::json!(&log); - - Some(entry) - } -} - -#[async_trait::async_trait] -impl HttpSink for AzureMonitorLogsSink { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = AzureMonitorLogsEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - AzureMonitorLogsEventEncoder { - transformer: self.transformer.clone(), - time_generated_key: self.time_generated_key.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - self.build_request_sync(events) - } -} - -impl AzureMonitorLogsSink { - fn new( - config: &AzureMonitorLogsConfig, - time_generated_key: Option, - ) -> crate::Result { - let url = format!( - "https://{}.{}{}?api-version={}", - config.customer_id, config.host, RESOURCE, API_VERSION - ); - let uri: Uri = url.parse()?; - - if config.shared_key.inner().is_empty() { - return Err("shared_key can't be an empty string".into()); - } - - let time_generated_key = - time_generated_key.or_else(|| log_schema().timestamp_key().cloned()); - - let shared_key_bytes = base64::decode_block(config.shared_key.inner())?; - let shared_key = pkey::PKey::hmac(&shared_key_bytes)?; - let mut default_headers = HeaderMap::with_capacity(3); - - if config.log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(&config.log_type) { - return Err(format!( - "invalid log_type \"{}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters", - config.log_type - ).into()); - } - - let log_type = HeaderValue::from_str(&config.log_type)?; - default_headers.insert(LOG_TYPE_HEADER.clone(), log_type); - - if let Some(timestamp_key) = &time_generated_key { - default_headers.insert( - TIME_GENERATED_FIELD_HEADER.clone(), - HeaderValue::try_from(timestamp_key.to_string())?, - ); - } - - if let Some(azure_resource_id) = &config.azure_resource_id { - if azure_resource_id.is_empty() { - return Err("azure_resource_id can't be an empty string".into()); - } - - default_headers.insert( - X_MS_AZURE_RESOURCE_HEADER.clone(), - HeaderValue::from_str(azure_resource_id)?, - ); - } - - default_headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); - - Ok(AzureMonitorLogsSink { - uri, - transformer: config.encoding.clone(), - customer_id: config.customer_id.clone(), + endpoint, + self.customer_id.clone(), + self.azure_resource_id.as_deref(), + &self.log_type, + time_generated_key.clone(), shared_key, - default_headers, + )?; + let healthcheck = service.healthcheck(); + + let retry_logic = + HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status); + let request_settings = self.request.unwrap_with(&Default::default()); + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let sink = AzureMonitorLogsSink::new( + batch_settings, + self.encoding.clone(), + service, time_generated_key, - }) - } - - fn build_request_sync(&self, events: Vec) -> crate::Result> { - let body = crate::serde::json::to_bytes(&events)?.freeze(); - let len = body.len(); - - let mut request = Request::post(self.uri.clone()).body(body)?; - let rfc1123date = chrono::Utc::now() - .format("%a, %d %b %Y %H:%M:%S GMT") - .to_string(); - - let authorization = self.build_authorization_header_value(&rfc1123date, len)?; - - *request.headers_mut() = self.default_headers.clone(); - request - .headers_mut() - .insert(header::AUTHORIZATION, authorization.parse()?); - request - .headers_mut() - .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?); - - Ok(request) - } - - fn build_authorization_header_value( - &self, - rfc1123date: &str, - len: usize, - ) -> crate::Result { - let string_to_hash = format!( - "POST\n{}\n{}\n{}:{}\n{}", - len, CONTENT_TYPE, X_MS_DATE, rfc1123date, RESOURCE + protocol, ); - let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?; - signer.update(string_to_hash.as_bytes())?; - let signature = signer.sign_to_vec()?; - let signature_base64 = base64::encode_block(&signature); - - Ok(format!( - "{} {}:{}", - SHARED_KEY, self.customer_id, signature_base64 - )) + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) } } -async fn healthcheck(sink: AzureMonitorLogsSink, client: HttpClient) -> crate::Result<()> { - let request = sink.build_request(vec![]).await?.map(Body::from); - - let res = client.send(request).await?; +impl_generate_config_from_default!(AzureMonitorLogsConfig); - if res.status().is_server_error() { - return Err("Server returned a server error".into()); +#[async_trait::async_trait] +#[typetag::serde(name = "azure_monitor_logs")] +impl SinkConfig for AzureMonitorLogsConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?; + self.build_inner(cx, endpoint).await } - if res.status() == StatusCode::FORBIDDEN { - return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); - } + fn input(&self) -> Input { + let requirements = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - if res.status() == StatusCode::NOT_FOUND { - return Err("Either the URL provided is incorrect, or the request is too large".into()); + Input::log().with_schema_requirement(requirements) } - if res.status() == StatusCode::BAD_REQUEST { - return Err("The workspace has been closed or the request was invalid".into()); + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements } - - Ok(()) } #[cfg(test)] @@ -421,12 +229,15 @@ mod tests { use std::time::Duration; use futures::{future::ready, stream}; - use serde_json::value::RawValue; + use http::Response; + use hyper::body; + use lookup::PathPrefix; + use openssl::{hash, sign}; + use tokio::time::timeout; use super::*; use crate::{ event::LogEvent, - sinks::util::BatchSize, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, http::{always_200_response, spawn_blackhole_http_server}, @@ -442,170 +253,21 @@ mod tests { async fn component_spec_compliance() { let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - // This is just a dummy shared key. - let shared_key_bytes = base64::decode_block( - "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==", - ) - .expect("should not fail to decode base64"); - let shared_key = - pkey::PKey::hmac(&shared_key_bytes).expect("should not fail to create HMAC key"); - - let sink = AzureMonitorLogsSink { - uri: mock_endpoint, - customer_id: "weee".to_string(), - time_generated_key: log_schema().timestamp_key().cloned(), - transformer: Default::default(), - shared_key, - default_headers: HeaderMap::new(), + let config = AzureMonitorLogsConfig { + shared_key: "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==" + .to_string() + .into(), + ..Default::default() }; let context = SinkContext::default(); - let client = - HttpClient::new(None, &context.proxy).expect("should not fail to create HTTP client"); - - let request_settings = - TowerRequestConfig::default().unwrap_with(&TowerRequestConfig::default()); - - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(BatchSize::const_default()), - request_settings, - Duration::from_secs(1), - client, - ) - .sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error)); - - let event = Event::Log(LogEvent::from("simple message")); - #[allow(deprecated)] - run_and_assert_sink_compliance( - VectorSink::from_event_sink(sink), - stream::once(ready(event)), - &SINK_TAGS, - ) - .await; - } - - fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { - let now = chrono::Utc::now(); - - let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); - log.insert(log_schema().timestamp_key_target_path().unwrap(), now); - - ( - log_schema().timestamp_key().unwrap().to_string(), - timestamp_value, - ) - } - - #[test] - fn encode_valid() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - # random GUID and random 64 Base-64 encoded bytes - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .unwrap(); - - let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); - let mut log = [("message", "hello world")] - .iter() - .copied() - .collect::(); - let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); - - let event = Event::from(log); - let mut encoder = sink.build_encoder(); - let json = encoder.encode_event(event).unwrap(); - let expected_json = serde_json::json!({ - timestamp_key: timestamp_value, - "message": "hello world" - }); - assert_eq!(json, expected_json); - } - - #[test] - fn correct_request() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - # random GUID and random 64 Base-64 encoded bytes - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - - let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); - let mut encoder = sink.build_encoder(); - - let mut log1 = [("message", "hello")].iter().copied().collect::(); - let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); - - let mut log2 = [("message", "world")].iter().copied().collect::(); - let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); - - let event1 = encoder.encode_event(Event::from(log1)).unwrap(); - let event2 = encoder.encode_event(Event::from(log2)).unwrap(); - - let json1 = serde_json::to_string(&event1).unwrap(); - let json2 = serde_json::to_string(&event2).unwrap(); - let raw1 = RawValue::from_string(json1).unwrap(); - let raw2 = RawValue::from_string(json2).unwrap(); - - let events = vec![raw1, raw2]; - - let request = sink.build_request_sync(events); - - let (parts, body) = request.unwrap().into_parts(); - assert_eq!(&parts.method.to_string(), "POST"); - - let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); - let expected_json = serde_json::json!([ - { - timestamp_key1: timestamp_value1, - "message": "hello" - }, - { - timestamp_key2: timestamp_value2, - "message": "world" - } - ]); - assert_eq!(json, expected_json); - - let headers = parts.headers; - let rfc1123date = headers.get("x-ms-date").unwrap(); - - let auth_expected = sink - .build_authorization_header_value(rfc1123date.to_str().unwrap(), body.len()) + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await .unwrap(); - let authorization = headers.get("authorization").unwrap(); - assert_eq!(authorization.to_str().unwrap(), &auth_expected); - - let log_type = headers.get("log-type").unwrap(); - assert_eq!(log_type.to_str().unwrap(), "Vector"); - - let time_generated_field = headers.get("time-generated-field").unwrap(); - let timestamp_key = log_schema().timestamp_key(); - assert_eq!( - time_generated_field.to_str().unwrap(), - timestamp_key.unwrap().to_string().as_str() - ); - - let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); - assert_eq!( - azure_resource_id.to_str().unwrap(), - "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - ); - - assert_eq!( - &parts.uri.to_string(), - "https://97ce69d9-b4be-4241-8dbd-d265edcf06c4.ods.opinsights.azure.com/api/logs?api-version=2016-04-01" - ); + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; } #[tokio::test] @@ -692,4 +354,126 @@ mod tests { ) .expect_err("Config parsing failed to error with missing customer_id"); } + + fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_key = log_schema().timestamp_key().unwrap(); + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + log.insert((PathPrefix::Event, timestamp_key), now); + + (timestamp_key.to_string(), timestamp_value) + } + + fn build_authorization_header_value( + shared_key: &pkey::PKey, + customer_id: &str, + rfc1123date: &str, + len: usize, + ) -> crate::Result { + let string_to_hash = + format!("POST\n{len}\napplication/json\nx-ms-date:{rfc1123date}\n/api/logs"); + let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), shared_key)?; + signer.update(string_to_hash.as_bytes())?; + + let signature = signer.sign_to_vec()?; + let signature_base64 = base64::encode_block(&signature); + + Ok(format!("SharedKey {customer_id}:{signature_base64}")) + } + + #[tokio::test] + async fn correct_request() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + # random GUID and random 64 Base-64 encoded bytes + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); + + let mut log2 = [("message", "world")].iter().copied().collect::(); + let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let tx = tx.clone(); + async move { + tx.send(request).await.unwrap(); + Ok(Response::new(hyper::Body::empty())) + } + }) + .await; + + let context = SinkContext::default(); + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await + .unwrap(); + + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; + + let request = timeout(Duration::from_millis(500), rx.recv()) + .await + .unwrap() + .unwrap(); + + let (parts, body) = request.into_parts(); + assert_eq!(&parts.method.to_string(), "POST"); + + let body = body::to_bytes(body).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + let expected_json = serde_json::json!([ + { + timestamp_key1: timestamp_value1, + "message": "hello" + }, + { + timestamp_key2: timestamp_value2, + "message": "world" + } + ]); + assert_eq!(json, expected_json); + + let headers = parts.headers; + + let rfc1123date = headers.get("x-ms-date").unwrap(); + let shared_key = config.build_shared_key().unwrap(); + let auth_expected = build_authorization_header_value( + &shared_key, + &config.customer_id, + rfc1123date.to_str().unwrap(), + body.len(), + ) + .unwrap(); + let authorization = headers.get("authorization").unwrap(); + assert_eq!(authorization.to_str().unwrap(), &auth_expected); + + let log_type = headers.get("log-type").unwrap(); + assert_eq!(log_type.to_str().unwrap(), "Vector"); + + let time_generated_field = headers.get("time-generated-field").unwrap(); + let timestamp_key = log_schema().timestamp_key(); + assert_eq!( + time_generated_field.to_str().unwrap(), + timestamp_key.unwrap().to_string().as_str() + ); + + let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); + assert_eq!( + azure_resource_id.to_str().unwrap(), + "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + ); + + assert_eq!( + &parts.uri.path_and_query().unwrap().to_string(), + "/api/logs?api-version=2016-04-01" + ); + } } diff --git a/src/sinks/azure_monitor_logs/mod.rs b/src/sinks/azure_monitor_logs/mod.rs new file mode 100644 index 0000000000000..05889431c99d7 --- /dev/null +++ b/src/sinks/azure_monitor_logs/mod.rs @@ -0,0 +1,5 @@ +pub mod config; +pub mod service; +pub mod sink; + +pub use config::AzureMonitorLogsConfig; diff --git a/src/sinks/azure_monitor_logs/service.rs b/src/sinks/azure_monitor_logs/service.rs new file mode 100644 index 0000000000000..285edd09e9902 --- /dev/null +++ b/src/sinks/azure_monitor_logs/service.rs @@ -0,0 +1,249 @@ +use bytes::Bytes; +use http::{ + header::{self, HeaderMap}, + HeaderName, HeaderValue, Request, StatusCode, Uri, +}; +use hyper::Body; +use lookup::lookup_v2::OwnedValuePath; +use once_cell::sync::Lazy; +use openssl::{base64, hash, pkey, sign}; +use regex::Regex; +use std::task::{Context, Poll}; +use tracing::Instrument; + +use crate::{http::HttpClient, sinks::prelude::*}; + +static LOG_TYPE_REGEX: Lazy = Lazy::new(|| Regex::new(r"^\w+$").unwrap()); +static LOG_TYPE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static("log-type")); +static X_MS_DATE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static(X_MS_DATE)); +static X_MS_AZURE_RESOURCE_HEADER: Lazy = + Lazy::new(|| HeaderName::from_static("x-ms-azureresourceid")); +static TIME_GENERATED_FIELD_HEADER: Lazy = + Lazy::new(|| HeaderName::from_static("time-generated-field")); +static CONTENT_TYPE_VALUE: Lazy = Lazy::new(|| HeaderValue::from_static(CONTENT_TYPE)); + +/// API endpoint for submitting logs +const RESOURCE: &str = "/api/logs"; +/// JSON content type of logs +const CONTENT_TYPE: &str = "application/json"; +/// Custom header used for signing logs +const X_MS_DATE: &str = "x-ms-date"; +/// Shared key prefix +const SHARED_KEY: &str = "SharedKey"; +/// API version +const API_VERSION: &str = "2016-04-01"; + +#[derive(Debug, Clone)] +pub struct AzureMonitorLogsRequest { + pub body: Bytes, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl MetaDescriptive for AzureMonitorLogsRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AzureMonitorLogsRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +pub struct AzureMonitorLogsResponse { + pub http_status: StatusCode, + pub events_byte_size: GroupedCountByteSize, + pub raw_byte_size: usize, +} + +impl DriverResponse for AzureMonitorLogsResponse { + fn event_status(&self) -> EventStatus { + match self.http_status.is_success() { + true => EventStatus::Delivered, + false => EventStatus::Rejected, + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +/// `AzureMonitorLogsService` is a `Tower` service used to send logs to Azure. +#[derive(Debug, Clone)] +pub struct AzureMonitorLogsService { + client: HttpClient, + endpoint: Uri, + customer_id: String, + shared_key: pkey::PKey, + default_headers: HeaderMap, +} + +impl AzureMonitorLogsService { + /// Creates a new `AzureMonitorLogsService`. + pub fn new( + client: HttpClient, + endpoint: Uri, + customer_id: String, + azure_resource_id: Option<&str>, + log_type: &str, + time_generated_key: Option, + shared_key: pkey::PKey, + ) -> crate::Result { + let mut parts = endpoint.into_parts(); + parts.path_and_query = Some( + format!("{RESOURCE}?api-version={API_VERSION}") + .parse() + .expect("path and query should never fail to parse"), + ); + let endpoint = Uri::from_parts(parts)?; + + let default_headers = { + let mut headers = HeaderMap::new(); + + if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) { + return Err(format!( + "invalid log_type \"{}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters", + log_type + ).into()); + } + let log_type = HeaderValue::from_str(log_type)?; + headers.insert(LOG_TYPE_HEADER.clone(), log_type); + + if let Some(timestamp_key) = time_generated_key { + headers.insert( + TIME_GENERATED_FIELD_HEADER.clone(), + HeaderValue::try_from(timestamp_key.to_string())?, + ); + } + + if let Some(azure_resource_id) = azure_resource_id { + if azure_resource_id.is_empty() { + return Err("azure_resource_id can't be an empty string".into()); + } + headers.insert( + X_MS_AZURE_RESOURCE_HEADER.clone(), + HeaderValue::from_str(azure_resource_id)?, + ); + } + + headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); + headers + }; + + Ok(Self { + client, + endpoint, + customer_id, + shared_key, + default_headers, + }) + } + + fn build_authorization_header_value( + &self, + rfc1123date: &str, + len: usize, + ) -> crate::Result { + let string_to_hash = + format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}"); + let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?; + signer.update(string_to_hash.as_bytes())?; + + let signature = signer.sign_to_vec()?; + let signature_base64 = base64::encode_block(&signature); + + Ok(format!( + "{} {}:{}", + SHARED_KEY, self.customer_id, signature_base64 + )) + } + + fn build_request(&self, body: Bytes) -> crate::Result> { + let len = body.len(); + + let mut request = Request::post(&self.endpoint).body(Body::from(body))?; + + let rfc1123date = chrono::Utc::now() + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string(); + let authorization = self.build_authorization_header_value(&rfc1123date, len)?; + + *request.headers_mut() = self.default_headers.clone(); + request + .headers_mut() + .insert(header::AUTHORIZATION, authorization.parse()?); + request + .headers_mut() + .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?); + + Ok(request) + } + + pub fn healthcheck(&self) -> Healthcheck { + let mut client = self.client.clone(); + let request = self.build_request(Bytes::from("[]")); + Box::pin(async move { + let request = request?; + let res = client.call(request).in_current_span().await?; + + if res.status().is_server_error() { + return Err("Server returned a server error".into()); + } + + if res.status() == StatusCode::FORBIDDEN { + return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); + } + + if res.status() == StatusCode::NOT_FOUND { + return Err( + "Either the URL provided is incorrect, or the request is too large".into(), + ); + } + + if res.status() == StatusCode::BAD_REQUEST { + return Err("The workspace has been closed or the request was invalid".into()); + } + + Ok(()) + }) + } +} + +impl Service for AzureMonitorLogsService { + type Response = AzureMonitorLogsResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + // Emission of Error internal event is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of Error internal event is handled upstream by the caller. + fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future { + let mut client = self.client.clone(); + let http_request = self.build_request(request.body); + Box::pin(async move { + let http_request = http_request?; + let response = client.call(http_request).in_current_span().await?; + Ok(AzureMonitorLogsResponse { + http_status: response.status(), + raw_byte_size: request.metadata.request_encoded_size(), + events_byte_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) + }) + } +} diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs new file mode 100644 index 0000000000000..9db80bcb95c43 --- /dev/null +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -0,0 +1,214 @@ +use std::{fmt::Debug, io}; + +use bytes::Bytes; +use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; +use lookup::{OwnedValuePath, PathPrefix}; + +use crate::sinks::prelude::*; + +use super::service::AzureMonitorLogsRequest; + +pub struct AzureMonitorLogsSink { + batch_settings: BatcherSettings, + encoding: JsonEncoding, + service: S, + protocol: String, +} + +impl AzureMonitorLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + pub fn new( + batch_settings: BatcherSettings, + transformer: Transformer, + service: S, + time_generated_key: Option, + protocol: String, + ) -> Self { + Self { + batch_settings, + encoding: JsonEncoding::new(transformer, time_generated_key), + service, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + AzureMonitorLogsRequestBuilder { + encoding: self.encoding, + }, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol(self.protocol.clone()) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AzureMonitorLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// Customized encoding specific to the Azure Monitor Logs sink, as the API does not support full +/// 9-digit nanosecond precision timestamps. +#[derive(Clone, Debug)] +struct JsonEncoding { + time_generated_key: Option, + encoder: (Transformer, Encoder), +} + +impl JsonEncoding { + pub fn new(transformer: Transformer, time_generated_key: Option) -> Self { + Self { + time_generated_key, + encoder: ( + transformer, + Encoder::::new( + CharacterDelimitedEncoder::new(b',').into(), + JsonSerializerConfig::default().build().into(), + ), + ), + } + } +} + +impl crate::sinks::util::encoding::Encoder> for JsonEncoding { + fn encode_input( + &self, + mut input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + for event in input.iter_mut() { + let log = event.as_mut_log(); + + // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or + // Metadata, the following `insert()` ensures it's encoded in the request. + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + chrono::Utc::now() + }; + + if let Some(timestamp_key) = &self.time_generated_key { + log.insert( + (PathPrefix::Event, timestamp_key), + serde_json::Value::String( + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + ), + ); + } + } + + self.encoder.encode_input(input, writer) + } +} + +struct AzureMonitorLogsRequestBuilder { + encoding: JsonEncoding, +} + +impl RequestBuilder> for AzureMonitorLogsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = JsonEncoding; + type Payload = Bytes; + type Request = AzureMonitorLogsRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoding + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + finalizers: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AzureMonitorLogsRequest { + body: payload.into_payload(), + finalizers, + metadata: request_metadata, + } + } +} + +#[cfg(test)] +pub mod tests { + use vector_core::config::log_schema; + + use super::*; + use crate::{event::LogEvent, sinks::util::encoding::Encoder}; + + fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_key = log_schema().timestamp_key().unwrap(); + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + log.insert((PathPrefix::Event, timestamp_key), now); + + (timestamp_key.to_string(), timestamp_value) + } + + #[test] + fn encode_valid() { + let mut log = [("message", "hello world")] + .iter() + .copied() + .collect::(); + let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); + + let event = Event::from(log); + let encoder = JsonEncoding::new(Default::default(), log_schema().timestamp_key().cloned()); + let mut encoded = vec![]; + encoder.encode_input(vec![event], &mut encoded).unwrap(); + let expected_json = serde_json::json!([{ + timestamp_key: timestamp_value, + "message": "hello world" + }]); + let json: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(json, expected_json); + } +} From d0ecc01040d415acafe2ce66538094a55732b2da Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 7 Aug 2023 12:52:25 -0400 Subject: [PATCH 2/5] update mod.rs --- src/sinks/azure_monitor_logs/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sinks/azure_monitor_logs/mod.rs b/src/sinks/azure_monitor_logs/mod.rs index 05889431c99d7..7e443e3f5dc7c 100644 --- a/src/sinks/azure_monitor_logs/mod.rs +++ b/src/sinks/azure_monitor_logs/mod.rs @@ -1,5 +1,5 @@ -pub mod config; -pub mod service; -pub mod sink; +mod config; +mod service; +mod sink; pub use config::AzureMonitorLogsConfig; From ad371041e689aad024b7cd2d780be8d79fa4e571 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 9 Aug 2023 09:45:18 -0400 Subject: [PATCH 3/5] neuronull feedback --- src/sinks/azure_monitor_logs/config.rs | 260 +---------------------- src/sinks/azure_monitor_logs/mod.rs | 8 + src/sinks/azure_monitor_logs/sink.rs | 40 +--- src/sinks/azure_monitor_logs/tests.rs | 275 +++++++++++++++++++++++++ 4 files changed, 287 insertions(+), 296 deletions(-) create mode 100644 src/sinks/azure_monitor_logs/tests.rs diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index a1bfab3400e81..2fe63c552b9d6 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -22,7 +22,7 @@ use super::{ /// Max number of bytes in request body const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; -fn default_host() -> String { +pub(super) fn default_host() -> String { "ods.opinsights.azure.com".into() } @@ -136,7 +136,7 @@ impl Default for AzureMonitorLogsConfig { } impl AzureMonitorLogsConfig { - fn build_shared_key(&self) -> crate::Result> { + pub(super) fn build_shared_key(&self) -> crate::Result> { if self.shared_key.inner().is_empty() { return Err("shared_key cannot be an empty string".into()); } @@ -152,7 +152,7 @@ impl AzureMonitorLogsConfig { .or_else(|| log_schema().timestamp_key().cloned()) } - async fn build_inner( + pub(super) async fn build_inner( &self, cx: SinkContext, endpoint: UriSerde, @@ -223,257 +223,3 @@ impl SinkConfig for AzureMonitorLogsConfig { &self.acknowledgements } } - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::{future::ready, stream}; - use http::Response; - use hyper::body; - use lookup::PathPrefix; - use openssl::{hash, sign}; - use tokio::time::timeout; - - use super::*; - use crate::{ - event::LogEvent, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = AzureMonitorLogsConfig { - shared_key: "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==" - .to_string() - .into(), - ..Default::default() - }; - - let context = SinkContext::default(); - let (sink, _healthcheck) = config - .build_inner(context, mock_endpoint.into()) - .await - .unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; - } - - #[tokio::test] - async fn fails_missing_creds() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn correct_host() { - let config_default = toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .expect("Config parsing failed without custom host"); - assert_eq!(config_default.host, default_host()); - - let config_cn = toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - host = "ods.opinsights.azure.cn" - "#, - ) - .expect("Config parsing failed with .cn custom host"); - assert_eq!(config_cn.host, "ods.opinsights.azure.cn"); - } - - #[tokio::test] - async fn fails_invalid_base64() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "1Qs77Vz40+iDMBBTRmROKJwnEX" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn fails_config_missing_fields() { - toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .expect_err("Config parsing failed to error with missing log_type"); - - toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .expect_err("Config parsing failed to error with missing shared_key"); - - toml::from_str::( - r#" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .expect_err("Config parsing failed to error with missing customer_id"); - } - - fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { - let now = chrono::Utc::now(); - - let timestamp_key = log_schema().timestamp_key().unwrap(); - let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); - log.insert((PathPrefix::Event, timestamp_key), now); - - (timestamp_key.to_string(), timestamp_value) - } - - fn build_authorization_header_value( - shared_key: &pkey::PKey, - customer_id: &str, - rfc1123date: &str, - len: usize, - ) -> crate::Result { - let string_to_hash = - format!("POST\n{len}\napplication/json\nx-ms-date:{rfc1123date}\n/api/logs"); - let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), shared_key)?; - signer.update(string_to_hash.as_bytes())?; - - let signature = signer.sign_to_vec()?; - let signature_base64 = base64::encode_block(&signature); - - Ok(format!("SharedKey {customer_id}:{signature_base64}")) - } - - #[tokio::test] - async fn correct_request() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - # random GUID and random 64 Base-64 encoded bytes - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - - let mut log1 = [("message", "hello")].iter().copied().collect::(); - let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); - - let mut log2 = [("message", "world")].iter().copied().collect::(); - let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); - - let (tx, mut rx) = tokio::sync::mpsc::channel(1); - let mock_endpoint = spawn_blackhole_http_server(move |request| { - let tx = tx.clone(); - async move { - tx.send(request).await.unwrap(); - Ok(Response::new(hyper::Body::empty())) - } - }) - .await; - - let context = SinkContext::default(); - let (sink, _healthcheck) = config - .build_inner(context, mock_endpoint.into()) - .await - .unwrap(); - - run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; - - let request = timeout(Duration::from_millis(500), rx.recv()) - .await - .unwrap() - .unwrap(); - - let (parts, body) = request.into_parts(); - assert_eq!(&parts.method.to_string(), "POST"); - - let body = body::to_bytes(body).await.unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); - let expected_json = serde_json::json!([ - { - timestamp_key1: timestamp_value1, - "message": "hello" - }, - { - timestamp_key2: timestamp_value2, - "message": "world" - } - ]); - assert_eq!(json, expected_json); - - let headers = parts.headers; - - let rfc1123date = headers.get("x-ms-date").unwrap(); - let shared_key = config.build_shared_key().unwrap(); - let auth_expected = build_authorization_header_value( - &shared_key, - &config.customer_id, - rfc1123date.to_str().unwrap(), - body.len(), - ) - .unwrap(); - let authorization = headers.get("authorization").unwrap(); - assert_eq!(authorization.to_str().unwrap(), &auth_expected); - - let log_type = headers.get("log-type").unwrap(); - assert_eq!(log_type.to_str().unwrap(), "Vector"); - - let time_generated_field = headers.get("time-generated-field").unwrap(); - let timestamp_key = log_schema().timestamp_key(); - assert_eq!( - time_generated_field.to_str().unwrap(), - timestamp_key.unwrap().to_string().as_str() - ); - - let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); - assert_eq!( - azure_resource_id.to_str().unwrap(), - "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - ); - - assert_eq!( - &parts.uri.path_and_query().unwrap().to_string(), - "/api/logs?api-version=2016-04-01" - ); - } -} diff --git a/src/sinks/azure_monitor_logs/mod.rs b/src/sinks/azure_monitor_logs/mod.rs index 7e443e3f5dc7c..e025b912f9b13 100644 --- a/src/sinks/azure_monitor_logs/mod.rs +++ b/src/sinks/azure_monitor_logs/mod.rs @@ -1,5 +1,13 @@ +//! The Azure Monitor Logs [`vector_core::sink::VectorSink`] +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`] instances and forwarding them to the Azure +//! Monitor Logs service. + mod config; mod service; mod sink; +#[cfg(test)] +mod tests; pub use config::AzureMonitorLogsConfig; diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs index 9db80bcb95c43..7b610cedebaa9 100644 --- a/src/sinks/azure_monitor_logs/sink.rs +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -81,7 +81,7 @@ where /// Customized encoding specific to the Azure Monitor Logs sink, as the API does not support full /// 9-digit nanosecond precision timestamps. #[derive(Clone, Debug)] -struct JsonEncoding { +pub(super) struct JsonEncoding { time_generated_key: Option, encoder: (Transformer, Encoder), } @@ -174,41 +174,3 @@ impl RequestBuilder> for AzureMonitorLogsRequestBuilder { } } } - -#[cfg(test)] -pub mod tests { - use vector_core::config::log_schema; - - use super::*; - use crate::{event::LogEvent, sinks::util::encoding::Encoder}; - - fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { - let now = chrono::Utc::now(); - - let timestamp_key = log_schema().timestamp_key().unwrap(); - let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); - log.insert((PathPrefix::Event, timestamp_key), now); - - (timestamp_key.to_string(), timestamp_value) - } - - #[test] - fn encode_valid() { - let mut log = [("message", "hello world")] - .iter() - .copied() - .collect::(); - let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); - - let event = Event::from(log); - let encoder = JsonEncoding::new(Default::default(), log_schema().timestamp_key().cloned()); - let mut encoded = vec![]; - encoder.encode_input(vec![event], &mut encoded).unwrap(); - let expected_json = serde_json::json!([{ - timestamp_key: timestamp_value, - "message": "hello world" - }]); - let json: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); - assert_eq!(json, expected_json); - } -} diff --git a/src/sinks/azure_monitor_logs/tests.rs b/src/sinks/azure_monitor_logs/tests.rs new file mode 100644 index 0000000000000..f4a193d30a1af --- /dev/null +++ b/src/sinks/azure_monitor_logs/tests.rs @@ -0,0 +1,275 @@ +use std::time::Duration; + +use futures::{future::ready, stream}; +use http::Response; +use hyper::body; +use lookup::PathPrefix; +use openssl::{base64, hash, pkey, sign}; +use tokio::time::timeout; +use vector_core::config::log_schema; + +use super::{ + config::{default_host, AzureMonitorLogsConfig}, + sink::JsonEncoding, +}; +use crate::{ + event::LogEvent, + sinks::{prelude::*, util::encoding::Encoder}, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = AzureMonitorLogsConfig { + shared_key: "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==" + .to_string() + .into(), + ..Default::default() + }; + + let context = SinkContext::default(); + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await + .unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} + +#[tokio::test] +async fn fails_missing_creds() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn correct_host() { + let config_default = toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + "#, + ) + .expect("Config parsing failed without custom host"); + assert_eq!(config_default.host, default_host()); + + let config_cn = toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + host = "ods.opinsights.azure.cn" + "#, + ) + .expect("Config parsing failed with .cn custom host"); + assert_eq!(config_cn.host, "ods.opinsights.azure.cn"); +} + +#[tokio::test] +async fn fails_invalid_base64() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "1Qs77Vz40+iDMBBTRmROKJwnEX" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn fails_config_missing_fields() { + toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .expect_err("Config parsing failed to error with missing log_type"); + + toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .expect_err("Config parsing failed to error with missing shared_key"); + + toml::from_str::( + r#" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + "#, + ) + .expect_err("Config parsing failed to error with missing customer_id"); +} + +fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_key = log_schema().timestamp_key().unwrap(); + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + log.insert((PathPrefix::Event, timestamp_key), now); + + (timestamp_key.to_string(), timestamp_value) +} + +fn build_authorization_header_value( + shared_key: &pkey::PKey, + customer_id: &str, + rfc1123date: &str, + len: usize, +) -> crate::Result { + let string_to_hash = + format!("POST\n{len}\napplication/json\nx-ms-date:{rfc1123date}\n/api/logs"); + let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), shared_key)?; + signer.update(string_to_hash.as_bytes())?; + + let signature = signer.sign_to_vec()?; + let signature_base64 = base64::encode_block(&signature); + + Ok(format!("SharedKey {customer_id}:{signature_base64}")) +} + +#[tokio::test] +async fn correct_request() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + # random GUID and random 64 Base-64 encoded bytes + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); + + let mut log2 = [("message", "world")].iter().copied().collect::(); + let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let tx = tx.clone(); + async move { + tx.send(request).await.unwrap(); + Ok(Response::new(hyper::Body::empty())) + } + }) + .await; + + let context = SinkContext::default(); + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await + .unwrap(); + + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &HTTP_SINK_TAGS).await; + + let request = timeout(Duration::from_millis(500), rx.recv()) + .await + .unwrap() + .unwrap(); + + let (parts, body) = request.into_parts(); + assert_eq!(&parts.method.to_string(), "POST"); + + let body = body::to_bytes(body).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + let expected_json = serde_json::json!([ + { + timestamp_key1: timestamp_value1, + "message": "hello" + }, + { + timestamp_key2: timestamp_value2, + "message": "world" + } + ]); + assert_eq!(json, expected_json); + + let headers = parts.headers; + + let rfc1123date = headers.get("x-ms-date").unwrap(); + let shared_key = config.build_shared_key().unwrap(); + let auth_expected = build_authorization_header_value( + &shared_key, + &config.customer_id, + rfc1123date.to_str().unwrap(), + body.len(), + ) + .unwrap(); + let authorization = headers.get("authorization").unwrap(); + assert_eq!(authorization.to_str().unwrap(), &auth_expected); + + let log_type = headers.get("log-type").unwrap(); + assert_eq!(log_type.to_str().unwrap(), "Vector"); + + let time_generated_field = headers.get("time-generated-field").unwrap(); + let timestamp_key = log_schema().timestamp_key(); + assert_eq!( + time_generated_field.to_str().unwrap(), + timestamp_key.unwrap().to_string().as_str() + ); + + let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); + assert_eq!( + azure_resource_id.to_str().unwrap(), + "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + ); + + assert_eq!( + &parts.uri.path_and_query().unwrap().to_string(), + "/api/logs?api-version=2016-04-01" + ); +} + +#[test] +fn encode_valid() { + let mut log = [("message", "hello world")] + .iter() + .copied() + .collect::(); + let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); + + let event = Event::from(log); + let encoder = JsonEncoding::new(Default::default(), log_schema().timestamp_key().cloned()); + let mut encoded = vec![]; + encoder.encode_input(vec![event], &mut encoded).unwrap(); + let expected_json = serde_json::json!([{ + timestamp_key: timestamp_value, + "message": "hello world" + }]); + let json: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(json, expected_json); +} From 262d308164938109c395f55dd4570884d788d0c4 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 9 Aug 2023 09:55:15 -0400 Subject: [PATCH 4/5] back to sink_tags --- src/sinks/azure_monitor_logs/config.rs | 2 +- src/sinks/azure_monitor_logs/tests.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index 2fe63c552b9d6..7a48b88c7b82b 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -114,7 +114,7 @@ pub struct AzureMonitorLogsConfig { deserialize_with = "crate::serde::bool_or_struct", skip_serializing_if = "crate::serde::skip_serializing_if_default" )] - acknowledgements: AcknowledgementsConfig, + pub acknowledgements: AcknowledgementsConfig, } impl Default for AzureMonitorLogsConfig { diff --git a/src/sinks/azure_monitor_logs/tests.rs b/src/sinks/azure_monitor_logs/tests.rs index f4a193d30a1af..af4310b7f5989 100644 --- a/src/sinks/azure_monitor_logs/tests.rs +++ b/src/sinks/azure_monitor_logs/tests.rs @@ -16,7 +16,7 @@ use crate::{ event::LogEvent, sinks::{prelude::*, util::encoding::Encoder}, test_util::{ - components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + components::{run_and_assert_sink_compliance, SINK_TAGS}, http::{always_200_response, spawn_blackhole_http_server}, }, }; @@ -44,7 +44,7 @@ async fn component_spec_compliance() { .unwrap(); let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; } #[tokio::test] @@ -194,7 +194,7 @@ async fn correct_request() { .await .unwrap(); - run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &HTTP_SINK_TAGS).await; + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; let request = timeout(Duration::from_millis(500), rx.recv()) .await From 955218f10174e054e8c44d9dacb7da6fe367e3a9 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Wed, 9 Aug 2023 10:04:28 -0400 Subject: [PATCH 5/5] merge master --- src/sinks/azure_monitor_logs/tests.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sinks/azure_monitor_logs/tests.rs b/src/sinks/azure_monitor_logs/tests.rs index af4310b7f5989..a28e84e47b3be 100644 --- a/src/sinks/azure_monitor_logs/tests.rs +++ b/src/sinks/azure_monitor_logs/tests.rs @@ -3,7 +3,6 @@ use std::time::Duration; use futures::{future::ready, stream}; use http::Response; use hyper::body; -use lookup::PathPrefix; use openssl::{base64, hash, pkey, sign}; use tokio::time::timeout; use vector_core::config::log_schema; @@ -135,11 +134,13 @@ fn fails_config_missing_fields() { fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { let now = chrono::Utc::now(); - let timestamp_key = log_schema().timestamp_key().unwrap(); let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); - log.insert((PathPrefix::Event, timestamp_key), now); + log.insert(log_schema().timestamp_key_target_path().unwrap(), now); - (timestamp_key.to_string(), timestamp_value) + ( + log_schema().timestamp_key().unwrap().to_string(), + timestamp_value, + ) } fn build_authorization_header_value(