diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs deleted file mode 100644 index ba8435499f359..0000000000000 --- a/src/sinks/azure_monitor_logs.rs +++ /dev/null @@ -1,695 +0,0 @@ -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{ - header, - header::{HeaderMap, HeaderName, HeaderValue}, - Request, StatusCode, Uri, -}; -use hyper::Body; -use lookup::lookup_v2::OptionalValuePath; -use lookup::{OwnedValuePath, PathPrefix}; -use once_cell::sync::Lazy; -use openssl::{base64, hash, pkey, sign}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use serde_json::Value as JsonValue; -use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; -use vector_core::schema; -use vrl::value::Kind; - -use crate::{ - codecs::Transformer, - config::{log_schema, AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - http::HttpClient, - sinks::{ - util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, - TowerRequestConfig, - }, - Healthcheck, VectorSink, - }, - tls::{TlsConfig, TlsSettings}, -}; - -fn default_host() -> String { - "ods.opinsights.azure.com".into() -} - -/// Configuration for the `azure_monitor_logs` sink. -#[configurable_component(sink( - "azure_monitor_logs", - "Publish log events to the Azure Monitor Logs service." -))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct AzureMonitorLogsConfig { - /// The [unique identifier][uniq_id] for the Log Analytics workspace. - /// - /// [uniq_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-uri-parameters - #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))] - #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))] - pub customer_id: String, - - /// The [primary or the secondary key][shared_key] for the Log Analytics workspace. - /// - /// [shared_key]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#authorization - #[configurable(metadata( - docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - ))] - #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))] - pub shared_key: SensitiveString, - - /// The [record type][record_type] of the data that is being submitted. - /// - /// Can only contain letters, numbers, and underscores (_), and may not exceed 100 characters. - /// - /// [record_type]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers - #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))] - #[configurable(metadata(docs::examples = "MyTableName"))] - #[configurable(metadata(docs::examples = "MyRecordType"))] - pub log_type: String, - - /// The [Resource ID][resource_id] of the Azure resource the data should be associated with. - /// - /// [resource_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers - #[configurable(metadata( - docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage" - ))] - #[configurable(metadata( - docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName" - ))] - pub azure_resource_id: Option, - - /// [Alternative host][alt_host] for dedicated Azure regions. - /// - /// [alt_host]: https://docs.azure.cn/en-us/articles/guidance/developerdifferences#check-endpoints-in-azure - #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))] - #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))] - #[serde(default = "default_host")] - pub(super) host: String, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub encoding: Transformer, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure. - /// - /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default. - /// This field should be used in rare cases where `TimeGenerated` should point to a specific log - /// field. For example, use this field to set the log field `source_timestamp` as holding the - /// value that should be used as `TimeGenerated` on the Azure side. - /// - /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated - #[configurable(metadata(docs::examples = "time_generated"))] - pub time_generated_key: Option, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -impl Default for AzureMonitorLogsConfig { - fn default() -> Self { - Self { - customer_id: "my-customer-id".to_string(), - shared_key: Default::default(), - log_type: "MyRecordType".to_string(), - azure_resource_id: None, - host: default_host(), - encoding: Default::default(), - batch: Default::default(), - request: Default::default(), - time_generated_key: None, - tls: None, - acknowledgements: Default::default(), - } - } -} - -#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum Encoding { - #[derivative(Default)] - Default, -} - -static LOG_TYPE_REGEX: Lazy = Lazy::new(|| Regex::new(r"^\w+$").unwrap()); -static LOG_TYPE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static("log-type")); -static X_MS_DATE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static(X_MS_DATE)); -static X_MS_AZURE_RESOURCE_HEADER: Lazy = - Lazy::new(|| HeaderName::from_static("x-ms-azureresourceid")); -static TIME_GENERATED_FIELD_HEADER: Lazy = - Lazy::new(|| HeaderName::from_static("time-generated-field")); -static CONTENT_TYPE_VALUE: Lazy = Lazy::new(|| HeaderValue::from_static(CONTENT_TYPE)); - -impl_generate_config_from_default!(AzureMonitorLogsConfig); - -/// Max number of bytes in request body -const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; -/// API endpoint for submitting logs -const RESOURCE: &str = "/api/logs"; -/// JSON content type of logs -const CONTENT_TYPE: &str = "application/json"; -/// Custom header used for signing logs -const X_MS_DATE: &str = "x-ms-date"; -/// Shared key prefix -const SHARED_KEY: &str = "SharedKey"; -/// API version -const API_VERSION: &str = "2016-04-01"; - -#[async_trait::async_trait] -#[typetag::serde(name = "azure_monitor_logs")] -impl SinkConfig for AzureMonitorLogsConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let batch_settings = self - .batch - .validate()? - .limit_max_bytes(MAX_BATCH_SIZE)? - .into_batch_settings()?; - - let time_generated_key = self.time_generated_key.clone().and_then(|k| k.path); - - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; - - let sink = AzureMonitorLogsSink::new(self, time_generated_key)?; - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); - - let healthcheck = healthcheck(sink.clone(), client.clone()).boxed(); - - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(batch_settings.size), - request_settings, - batch_settings.timeout, - client, - ) - .sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error)); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirements = - schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirements) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -#[derive(Clone)] -struct AzureMonitorLogsSink { - uri: Uri, - customer_id: String, - time_generated_key: Option, - transformer: Transformer, - shared_key: pkey::PKey, - default_headers: HeaderMap, -} - -struct AzureMonitorLogsEventEncoder { - transformer: Transformer, - time_generated_key: Option, -} - -impl HttpEventEncoder for AzureMonitorLogsEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - - // it seems like Azure Monitor doesn't support full 9-digit nanosecond precision - // adjust the timestamp format accordingly, keeping only milliseconds - let mut log = event.into_log(); - - // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or - // Metadata, the following `insert()` ensures it's encoded in the request. - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { - ts - } else { - chrono::Utc::now() - }; - - if let Some(timestamp_key) = &self.time_generated_key { - log.insert( - (PathPrefix::Event, timestamp_key), - JsonValue::String(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)), - ); - } - - let entry = serde_json::json!(&log); - - Some(entry) - } -} - -#[async_trait::async_trait] -impl HttpSink for AzureMonitorLogsSink { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = AzureMonitorLogsEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - AzureMonitorLogsEventEncoder { - transformer: self.transformer.clone(), - time_generated_key: self.time_generated_key.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - self.build_request_sync(events) - } -} - -impl AzureMonitorLogsSink { - fn new( - config: &AzureMonitorLogsConfig, - time_generated_key: Option, - ) -> crate::Result { - let url = format!( - "https://{}.{}{}?api-version={}", - config.customer_id, config.host, RESOURCE, API_VERSION - ); - let uri: Uri = url.parse()?; - - if config.shared_key.inner().is_empty() { - return Err("shared_key can't be an empty string".into()); - } - - let time_generated_key = - time_generated_key.or_else(|| log_schema().timestamp_key().cloned()); - - let shared_key_bytes = base64::decode_block(config.shared_key.inner())?; - let shared_key = pkey::PKey::hmac(&shared_key_bytes)?; - let mut default_headers = HeaderMap::with_capacity(3); - - if config.log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(&config.log_type) { - return Err(format!( - "invalid log_type \"{}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters", - config.log_type - ).into()); - } - - let log_type = HeaderValue::from_str(&config.log_type)?; - default_headers.insert(LOG_TYPE_HEADER.clone(), log_type); - - if let Some(timestamp_key) = &time_generated_key { - default_headers.insert( - TIME_GENERATED_FIELD_HEADER.clone(), - HeaderValue::try_from(timestamp_key.to_string())?, - ); - } - - if let Some(azure_resource_id) = &config.azure_resource_id { - if azure_resource_id.is_empty() { - return Err("azure_resource_id can't be an empty string".into()); - } - - default_headers.insert( - X_MS_AZURE_RESOURCE_HEADER.clone(), - HeaderValue::from_str(azure_resource_id)?, - ); - } - - default_headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); - - Ok(AzureMonitorLogsSink { - uri, - transformer: config.encoding.clone(), - customer_id: config.customer_id.clone(), - shared_key, - default_headers, - time_generated_key, - }) - } - - fn build_request_sync(&self, events: Vec) -> crate::Result> { - let body = crate::serde::json::to_bytes(&events)?.freeze(); - let len = body.len(); - - let mut request = Request::post(self.uri.clone()).body(body)?; - let rfc1123date = chrono::Utc::now() - .format("%a, %d %b %Y %H:%M:%S GMT") - .to_string(); - - let authorization = self.build_authorization_header_value(&rfc1123date, len)?; - - *request.headers_mut() = self.default_headers.clone(); - request - .headers_mut() - .insert(header::AUTHORIZATION, authorization.parse()?); - request - .headers_mut() - .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?); - - Ok(request) - } - - fn build_authorization_header_value( - &self, - rfc1123date: &str, - len: usize, - ) -> crate::Result { - let string_to_hash = format!( - "POST\n{}\n{}\n{}:{}\n{}", - len, CONTENT_TYPE, X_MS_DATE, rfc1123date, RESOURCE - ); - let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?; - signer.update(string_to_hash.as_bytes())?; - - let signature = signer.sign_to_vec()?; - let signature_base64 = base64::encode_block(&signature); - - Ok(format!( - "{} {}:{}", - SHARED_KEY, self.customer_id, signature_base64 - )) - } -} - -async fn healthcheck(sink: AzureMonitorLogsSink, client: HttpClient) -> crate::Result<()> { - let request = sink.build_request(vec![]).await?.map(Body::from); - - let res = client.send(request).await?; - - if res.status().is_server_error() { - return Err("Server returned a server error".into()); - } - - if res.status() == StatusCode::FORBIDDEN { - return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); - } - - if res.status() == StatusCode::NOT_FOUND { - return Err("Either the URL provided is incorrect, or the request is too large".into()); - } - - if res.status() == StatusCode::BAD_REQUEST { - return Err("The workspace has been closed or the request was invalid".into()); - } - - Ok(()) -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use futures::{future::ready, stream}; - use serde_json::value::RawValue; - - use super::*; - use crate::{ - event::LogEvent, - sinks::util::BatchSize, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - // This is just a dummy shared key. - let shared_key_bytes = base64::decode_block( - "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==", - ) - .expect("should not fail to decode base64"); - let shared_key = - pkey::PKey::hmac(&shared_key_bytes).expect("should not fail to create HMAC key"); - - let sink = AzureMonitorLogsSink { - uri: mock_endpoint, - customer_id: "weee".to_string(), - time_generated_key: log_schema().timestamp_key().cloned(), - transformer: Default::default(), - shared_key, - default_headers: HeaderMap::new(), - }; - - let context = SinkContext::default(); - let client = - HttpClient::new(None, &context.proxy).expect("should not fail to create HTTP client"); - - let request_settings = - TowerRequestConfig::default().unwrap_with(&TowerRequestConfig::default()); - - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(BatchSize::const_default()), - request_settings, - Duration::from_secs(1), - client, - ) - .sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error)); - - let event = Event::Log(LogEvent::from("simple message")); - #[allow(deprecated)] - run_and_assert_sink_compliance( - VectorSink::from_event_sink(sink), - stream::once(ready(event)), - &SINK_TAGS, - ) - .await; - } - - fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { - let now = chrono::Utc::now(); - - let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); - log.insert(log_schema().timestamp_key_target_path().unwrap(), now); - - ( - log_schema().timestamp_key().unwrap().to_string(), - timestamp_value, - ) - } - - #[test] - fn encode_valid() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - # random GUID and random 64 Base-64 encoded bytes - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .unwrap(); - - let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); - let mut log = [("message", "hello world")] - .iter() - .copied() - .collect::(); - let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); - - let event = Event::from(log); - let mut encoder = sink.build_encoder(); - let json = encoder.encode_event(event).unwrap(); - let expected_json = serde_json::json!({ - timestamp_key: timestamp_value, - "message": "hello world" - }); - assert_eq!(json, expected_json); - } - - #[test] - fn correct_request() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - # random GUID and random 64 Base-64 encoded bytes - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - - let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); - let mut encoder = sink.build_encoder(); - - let mut log1 = [("message", "hello")].iter().copied().collect::(); - let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); - - let mut log2 = [("message", "world")].iter().copied().collect::(); - let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); - - let event1 = encoder.encode_event(Event::from(log1)).unwrap(); - let event2 = encoder.encode_event(Event::from(log2)).unwrap(); - - let json1 = serde_json::to_string(&event1).unwrap(); - let json2 = serde_json::to_string(&event2).unwrap(); - let raw1 = RawValue::from_string(json1).unwrap(); - let raw2 = RawValue::from_string(json2).unwrap(); - - let events = vec![raw1, raw2]; - - let request = sink.build_request_sync(events); - - let (parts, body) = request.unwrap().into_parts(); - assert_eq!(&parts.method.to_string(), "POST"); - - let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); - let expected_json = serde_json::json!([ - { - timestamp_key1: timestamp_value1, - "message": "hello" - }, - { - timestamp_key2: timestamp_value2, - "message": "world" - } - ]); - assert_eq!(json, expected_json); - - let headers = parts.headers; - let rfc1123date = headers.get("x-ms-date").unwrap(); - - let auth_expected = sink - .build_authorization_header_value(rfc1123date.to_str().unwrap(), body.len()) - .unwrap(); - - let authorization = headers.get("authorization").unwrap(); - assert_eq!(authorization.to_str().unwrap(), &auth_expected); - - let log_type = headers.get("log-type").unwrap(); - assert_eq!(log_type.to_str().unwrap(), "Vector"); - - let time_generated_field = headers.get("time-generated-field").unwrap(); - let timestamp_key = log_schema().timestamp_key(); - assert_eq!( - time_generated_field.to_str().unwrap(), - timestamp_key.unwrap().to_string().as_str() - ); - - let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); - assert_eq!( - azure_resource_id.to_str().unwrap(), - "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - ); - - assert_eq!( - &parts.uri.to_string(), - "https://97ce69d9-b4be-4241-8dbd-d265edcf06c4.ods.opinsights.azure.com/api/logs?api-version=2016-04-01" - ); - } - - #[tokio::test] - async fn fails_missing_creds() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn correct_host() { - let config_default = toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .expect("Config parsing failed without custom host"); - assert_eq!(config_default.host, default_host()); - - let config_cn = toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - host = "ods.opinsights.azure.cn" - "#, - ) - .expect("Config parsing failed with .cn custom host"); - assert_eq!(config_cn.host, "ods.opinsights.azure.cn"); - } - - #[tokio::test] - async fn fails_invalid_base64() { - let config: AzureMonitorLogsConfig = toml::from_str( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "1Qs77Vz40+iDMBBTRmROKJwnEX" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn fails_config_missing_fields() { - toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .expect_err("Config parsing failed to error with missing log_type"); - - toml::from_str::( - r#" - customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - log_type = "Vector" - azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" - "#, - ) - .expect_err("Config parsing failed to error with missing shared_key"); - - toml::from_str::( - r#" - shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" - log_type = "Vector" - "#, - ) - .expect_err("Config parsing failed to error with missing customer_id"); - } -} diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs new file mode 100644 index 0000000000000..7a48b88c7b82b --- /dev/null +++ b/src/sinks/azure_monitor_logs/config.rs @@ -0,0 +1,225 @@ +use lookup::{lookup_v2::OptionalValuePath, OwnedValuePath}; +use openssl::{base64, pkey}; + +use vector_common::sensitive_string::SensitiveString; +use vector_config::configurable_component; +use vector_core::{config::log_schema, schema}; +use vrl::value::Kind; + +use crate::{ + http::{get_http_scheme_from_uri, HttpClient}, + sinks::{ + prelude::*, + util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde}, + }, +}; + +use super::{ + service::{AzureMonitorLogsResponse, AzureMonitorLogsService}, + sink::AzureMonitorLogsSink, +}; + +/// Max number of bytes in request body +const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; + +pub(super) fn default_host() -> String { + "ods.opinsights.azure.com".into() +} + +/// Configuration for the `azure_monitor_logs` sink. +#[configurable_component(sink( + "azure_monitor_logs", + "Publish log events to the Azure Monitor Logs service." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct AzureMonitorLogsConfig { + /// The [unique identifier][uniq_id] for the Log Analytics workspace. + /// + /// [uniq_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-uri-parameters + #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))] + #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))] + pub customer_id: String, + + /// The [primary or the secondary key][shared_key] for the Log Analytics workspace. + /// + /// [shared_key]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#authorization + #[configurable(metadata( + docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + ))] + #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))] + pub shared_key: SensitiveString, + + /// The [record type][record_type] of the data that is being submitted. + /// + /// Can only contain letters, numbers, and underscores (_), and may not exceed 100 characters. + /// + /// [record_type]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers + #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))] + #[configurable(metadata(docs::examples = "MyTableName"))] + #[configurable(metadata(docs::examples = "MyRecordType"))] + pub log_type: String, + + /// The [Resource ID][resource_id] of the Azure resource the data should be associated with. + /// + /// [resource_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers + #[configurable(metadata( + docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage" + ))] + #[configurable(metadata( + docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName" + ))] + pub azure_resource_id: Option, + + /// [Alternative host][alt_host] for dedicated Azure regions. + /// + /// [alt_host]: https://docs.azure.cn/en-us/articles/guidance/developerdifferences#check-endpoints-in-azure + #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))] + #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))] + #[serde(default = "default_host")] + pub(super) host: String, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub encoding: Transformer, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure. + /// + /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default. + /// This field should be used in rare cases where `TimeGenerated` should point to a specific log + /// field. For example, use this field to set the log field `source_timestamp` as holding the + /// value that should be used as `TimeGenerated` on the Azure side. + /// + /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + #[configurable(metadata(docs::examples = "time_generated"))] + pub time_generated_key: Option, + + #[configurable(derived)] + pub tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl Default for AzureMonitorLogsConfig { + fn default() -> Self { + Self { + customer_id: "my-customer-id".to_string(), + shared_key: Default::default(), + log_type: "MyRecordType".to_string(), + azure_resource_id: None, + host: default_host(), + encoding: Default::default(), + batch: Default::default(), + request: Default::default(), + time_generated_key: None, + tls: None, + acknowledgements: Default::default(), + } + } +} + +impl AzureMonitorLogsConfig { + pub(super) fn build_shared_key(&self) -> crate::Result> { + if self.shared_key.inner().is_empty() { + return Err("shared_key cannot be an empty string".into()); + } + let shared_key_bytes = base64::decode_block(self.shared_key.inner())?; + let shared_key = pkey::PKey::hmac(&shared_key_bytes)?; + Ok(shared_key) + } + + fn get_time_generated_key(&self) -> Option { + self.time_generated_key + .clone() + .and_then(|k| k.path) + .or_else(|| log_schema().timestamp_key().cloned()) + } + + pub(super) async fn build_inner( + &self, + cx: SinkContext, + endpoint: UriSerde, + ) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = endpoint.with_default_parts().uri; + let protocol = get_http_scheme_from_uri(&endpoint).to_string(); + + let batch_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_SIZE)? + .into_batcher_settings()?; + + let shared_key = self.build_shared_key()?; + let time_generated_key = self.get_time_generated_key(); + + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; + + let service = AzureMonitorLogsService::new( + client, + endpoint, + self.customer_id.clone(), + self.azure_resource_id.as_deref(), + &self.log_type, + time_generated_key.clone(), + shared_key, + )?; + let healthcheck = service.healthcheck(); + + let retry_logic = + HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status); + let request_settings = self.request.unwrap_with(&Default::default()); + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let sink = AzureMonitorLogsSink::new( + batch_settings, + self.encoding.clone(), + service, + time_generated_key, + protocol, + ); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } +} + +impl_generate_config_from_default!(AzureMonitorLogsConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "azure_monitor_logs")] +impl SinkConfig for AzureMonitorLogsConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?; + self.build_inner(cx, endpoint).await + } + + fn input(&self) -> Input { + let requirements = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirements) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} diff --git a/src/sinks/azure_monitor_logs/mod.rs b/src/sinks/azure_monitor_logs/mod.rs new file mode 100644 index 0000000000000..e025b912f9b13 --- /dev/null +++ b/src/sinks/azure_monitor_logs/mod.rs @@ -0,0 +1,13 @@ +//! The Azure Monitor Logs [`vector_core::sink::VectorSink`] +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`] instances and forwarding them to the Azure +//! Monitor Logs service. + +mod config; +mod service; +mod sink; +#[cfg(test)] +mod tests; + +pub use config::AzureMonitorLogsConfig; diff --git a/src/sinks/azure_monitor_logs/service.rs b/src/sinks/azure_monitor_logs/service.rs new file mode 100644 index 0000000000000..285edd09e9902 --- /dev/null +++ b/src/sinks/azure_monitor_logs/service.rs @@ -0,0 +1,249 @@ +use bytes::Bytes; +use http::{ + header::{self, HeaderMap}, + HeaderName, HeaderValue, Request, StatusCode, Uri, +}; +use hyper::Body; +use lookup::lookup_v2::OwnedValuePath; +use once_cell::sync::Lazy; +use openssl::{base64, hash, pkey, sign}; +use regex::Regex; +use std::task::{Context, Poll}; +use tracing::Instrument; + +use crate::{http::HttpClient, sinks::prelude::*}; + +static LOG_TYPE_REGEX: Lazy = Lazy::new(|| Regex::new(r"^\w+$").unwrap()); +static LOG_TYPE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static("log-type")); +static X_MS_DATE_HEADER: Lazy = Lazy::new(|| HeaderName::from_static(X_MS_DATE)); +static X_MS_AZURE_RESOURCE_HEADER: Lazy = + Lazy::new(|| HeaderName::from_static("x-ms-azureresourceid")); +static TIME_GENERATED_FIELD_HEADER: Lazy = + Lazy::new(|| HeaderName::from_static("time-generated-field")); +static CONTENT_TYPE_VALUE: Lazy = Lazy::new(|| HeaderValue::from_static(CONTENT_TYPE)); + +/// API endpoint for submitting logs +const RESOURCE: &str = "/api/logs"; +/// JSON content type of logs +const CONTENT_TYPE: &str = "application/json"; +/// Custom header used for signing logs +const X_MS_DATE: &str = "x-ms-date"; +/// Shared key prefix +const SHARED_KEY: &str = "SharedKey"; +/// API version +const API_VERSION: &str = "2016-04-01"; + +#[derive(Debug, Clone)] +pub struct AzureMonitorLogsRequest { + pub body: Bytes, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl MetaDescriptive for AzureMonitorLogsRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AzureMonitorLogsRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +pub struct AzureMonitorLogsResponse { + pub http_status: StatusCode, + pub events_byte_size: GroupedCountByteSize, + pub raw_byte_size: usize, +} + +impl DriverResponse for AzureMonitorLogsResponse { + fn event_status(&self) -> EventStatus { + match self.http_status.is_success() { + true => EventStatus::Delivered, + false => EventStatus::Rejected, + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +/// `AzureMonitorLogsService` is a `Tower` service used to send logs to Azure. +#[derive(Debug, Clone)] +pub struct AzureMonitorLogsService { + client: HttpClient, + endpoint: Uri, + customer_id: String, + shared_key: pkey::PKey, + default_headers: HeaderMap, +} + +impl AzureMonitorLogsService { + /// Creates a new `AzureMonitorLogsService`. + pub fn new( + client: HttpClient, + endpoint: Uri, + customer_id: String, + azure_resource_id: Option<&str>, + log_type: &str, + time_generated_key: Option, + shared_key: pkey::PKey, + ) -> crate::Result { + let mut parts = endpoint.into_parts(); + parts.path_and_query = Some( + format!("{RESOURCE}?api-version={API_VERSION}") + .parse() + .expect("path and query should never fail to parse"), + ); + let endpoint = Uri::from_parts(parts)?; + + let default_headers = { + let mut headers = HeaderMap::new(); + + if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) { + return Err(format!( + "invalid log_type \"{}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters", + log_type + ).into()); + } + let log_type = HeaderValue::from_str(log_type)?; + headers.insert(LOG_TYPE_HEADER.clone(), log_type); + + if let Some(timestamp_key) = time_generated_key { + headers.insert( + TIME_GENERATED_FIELD_HEADER.clone(), + HeaderValue::try_from(timestamp_key.to_string())?, + ); + } + + if let Some(azure_resource_id) = azure_resource_id { + if azure_resource_id.is_empty() { + return Err("azure_resource_id can't be an empty string".into()); + } + headers.insert( + X_MS_AZURE_RESOURCE_HEADER.clone(), + HeaderValue::from_str(azure_resource_id)?, + ); + } + + headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); + headers + }; + + Ok(Self { + client, + endpoint, + customer_id, + shared_key, + default_headers, + }) + } + + fn build_authorization_header_value( + &self, + rfc1123date: &str, + len: usize, + ) -> crate::Result { + let string_to_hash = + format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}"); + let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?; + signer.update(string_to_hash.as_bytes())?; + + let signature = signer.sign_to_vec()?; + let signature_base64 = base64::encode_block(&signature); + + Ok(format!( + "{} {}:{}", + SHARED_KEY, self.customer_id, signature_base64 + )) + } + + fn build_request(&self, body: Bytes) -> crate::Result> { + let len = body.len(); + + let mut request = Request::post(&self.endpoint).body(Body::from(body))?; + + let rfc1123date = chrono::Utc::now() + .format("%a, %d %b %Y %H:%M:%S GMT") + .to_string(); + let authorization = self.build_authorization_header_value(&rfc1123date, len)?; + + *request.headers_mut() = self.default_headers.clone(); + request + .headers_mut() + .insert(header::AUTHORIZATION, authorization.parse()?); + request + .headers_mut() + .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?); + + Ok(request) + } + + pub fn healthcheck(&self) -> Healthcheck { + let mut client = self.client.clone(); + let request = self.build_request(Bytes::from("[]")); + Box::pin(async move { + let request = request?; + let res = client.call(request).in_current_span().await?; + + if res.status().is_server_error() { + return Err("Server returned a server error".into()); + } + + if res.status() == StatusCode::FORBIDDEN { + return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); + } + + if res.status() == StatusCode::NOT_FOUND { + return Err( + "Either the URL provided is incorrect, or the request is too large".into(), + ); + } + + if res.status() == StatusCode::BAD_REQUEST { + return Err("The workspace has been closed or the request was invalid".into()); + } + + Ok(()) + }) + } +} + +impl Service for AzureMonitorLogsService { + type Response = AzureMonitorLogsResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + // Emission of Error internal event is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of Error internal event is handled upstream by the caller. + fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future { + let mut client = self.client.clone(); + let http_request = self.build_request(request.body); + Box::pin(async move { + let http_request = http_request?; + let response = client.call(http_request).in_current_span().await?; + Ok(AzureMonitorLogsResponse { + http_status: response.status(), + raw_byte_size: request.metadata.request_encoded_size(), + events_byte_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) + }) + } +} diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs new file mode 100644 index 0000000000000..7b610cedebaa9 --- /dev/null +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -0,0 +1,176 @@ +use std::{fmt::Debug, io}; + +use bytes::Bytes; +use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; +use lookup::{OwnedValuePath, PathPrefix}; + +use crate::sinks::prelude::*; + +use super::service::AzureMonitorLogsRequest; + +pub struct AzureMonitorLogsSink { + batch_settings: BatcherSettings, + encoding: JsonEncoding, + service: S, + protocol: String, +} + +impl AzureMonitorLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + pub fn new( + batch_settings: BatcherSettings, + transformer: Transformer, + service: S, + time_generated_key: Option, + protocol: String, + ) -> Self { + Self { + batch_settings, + encoding: JsonEncoding::new(transformer, time_generated_key), + service, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.into_byte_size_config()) + .request_builder( + None, + AzureMonitorLogsRequestBuilder { + encoding: self.encoding, + }, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol(self.protocol.clone()) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AzureMonitorLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// Customized encoding specific to the Azure Monitor Logs sink, as the API does not support full +/// 9-digit nanosecond precision timestamps. +#[derive(Clone, Debug)] +pub(super) struct JsonEncoding { + time_generated_key: Option, + encoder: (Transformer, Encoder), +} + +impl JsonEncoding { + pub fn new(transformer: Transformer, time_generated_key: Option) -> Self { + Self { + time_generated_key, + encoder: ( + transformer, + Encoder::::new( + CharacterDelimitedEncoder::new(b',').into(), + JsonSerializerConfig::default().build().into(), + ), + ), + } + } +} + +impl crate::sinks::util::encoding::Encoder> for JsonEncoding { + fn encode_input( + &self, + mut input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + for event in input.iter_mut() { + let log = event.as_mut_log(); + + // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or + // Metadata, the following `insert()` ensures it's encoded in the request. + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + chrono::Utc::now() + }; + + if let Some(timestamp_key) = &self.time_generated_key { + log.insert( + (PathPrefix::Event, timestamp_key), + serde_json::Value::String( + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + ), + ); + } + } + + self.encoder.encode_input(input, writer) + } +} + +struct AzureMonitorLogsRequestBuilder { + encoding: JsonEncoding, +} + +impl RequestBuilder> for AzureMonitorLogsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = JsonEncoding; + type Payload = Bytes; + type Request = AzureMonitorLogsRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoding + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + finalizers: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AzureMonitorLogsRequest { + body: payload.into_payload(), + finalizers, + metadata: request_metadata, + } + } +} diff --git a/src/sinks/azure_monitor_logs/tests.rs b/src/sinks/azure_monitor_logs/tests.rs new file mode 100644 index 0000000000000..a28e84e47b3be --- /dev/null +++ b/src/sinks/azure_monitor_logs/tests.rs @@ -0,0 +1,276 @@ +use std::time::Duration; + +use futures::{future::ready, stream}; +use http::Response; +use hyper::body; +use openssl::{base64, hash, pkey, sign}; +use tokio::time::timeout; +use vector_core::config::log_schema; + +use super::{ + config::{default_host, AzureMonitorLogsConfig}, + sink::JsonEncoding, +}; +use crate::{ + event::LogEvent, + sinks::{prelude::*, util::encoding::Encoder}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = AzureMonitorLogsConfig { + shared_key: "ZnNkO2Zhc2RrbGZqYXNkaixmaG5tZXF3dWlsamtmYXNjZmouYXNkbmZrbHFhc2ZtYXNrbA==" + .to_string() + .into(), + ..Default::default() + }; + + let context = SinkContext::default(); + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await + .unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; +} + +#[tokio::test] +async fn fails_missing_creds() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn correct_host() { + let config_default = toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + "#, + ) + .expect("Config parsing failed without custom host"); + assert_eq!(config_default.host, default_host()); + + let config_cn = toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + host = "ods.opinsights.azure.cn" + "#, + ) + .expect("Config parsing failed with .cn custom host"); + assert_eq!(config_cn.host, "ods.opinsights.azure.cn"); +} + +#[tokio::test] +async fn fails_invalid_base64() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "1Qs77Vz40+iDMBBTRmROKJwnEX" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn fails_config_missing_fields() { + toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .expect_err("Config parsing failed to error with missing log_type"); + + toml::from_str::( + r#" + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .expect_err("Config parsing failed to error with missing shared_key"); + + toml::from_str::( + r#" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + "#, + ) + .expect_err("Config parsing failed to error with missing customer_id"); +} + +fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true); + log.insert(log_schema().timestamp_key_target_path().unwrap(), now); + + ( + log_schema().timestamp_key().unwrap().to_string(), + timestamp_value, + ) +} + +fn build_authorization_header_value( + shared_key: &pkey::PKey, + customer_id: &str, + rfc1123date: &str, + len: usize, +) -> crate::Result { + let string_to_hash = + format!("POST\n{len}\napplication/json\nx-ms-date:{rfc1123date}\n/api/logs"); + let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), shared_key)?; + signer.update(string_to_hash.as_bytes())?; + + let signature = signer.sign_to_vec()?; + let signature_base64 = base64::encode_block(&signature); + + Ok(format!("SharedKey {customer_id}:{signature_base64}")) +} + +#[tokio::test] +async fn correct_request() { + let config: AzureMonitorLogsConfig = toml::from_str( + r#" + # random GUID and random 64 Base-64 encoded bytes + customer_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + shared_key = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==" + log_type = "Vector" + azure_resource_id = "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); + + let mut log2 = [("message", "world")].iter().copied().collect::(); + let (timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); + + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let tx = tx.clone(); + async move { + tx.send(request).await.unwrap(); + Ok(Response::new(hyper::Body::empty())) + } + }) + .await; + + let context = SinkContext::default(); + let (sink, _healthcheck) = config + .build_inner(context, mock_endpoint.into()) + .await + .unwrap(); + + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; + + let request = timeout(Duration::from_millis(500), rx.recv()) + .await + .unwrap() + .unwrap(); + + let (parts, body) = request.into_parts(); + assert_eq!(&parts.method.to_string(), "POST"); + + let body = body::to_bytes(body).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + let expected_json = serde_json::json!([ + { + timestamp_key1: timestamp_value1, + "message": "hello" + }, + { + timestamp_key2: timestamp_value2, + "message": "world" + } + ]); + assert_eq!(json, expected_json); + + let headers = parts.headers; + + let rfc1123date = headers.get("x-ms-date").unwrap(); + let shared_key = config.build_shared_key().unwrap(); + let auth_expected = build_authorization_header_value( + &shared_key, + &config.customer_id, + rfc1123date.to_str().unwrap(), + body.len(), + ) + .unwrap(); + let authorization = headers.get("authorization").unwrap(); + assert_eq!(authorization.to_str().unwrap(), &auth_expected); + + let log_type = headers.get("log-type").unwrap(); + assert_eq!(log_type.to_str().unwrap(), "Vector"); + + let time_generated_field = headers.get("time-generated-field").unwrap(); + let timestamp_key = log_schema().timestamp_key(); + assert_eq!( + time_generated_field.to_str().unwrap(), + timestamp_key.unwrap().to_string().as_str() + ); + + let azure_resource_id = headers.get("x-ms-azureresourceid").unwrap(); + assert_eq!( + azure_resource_id.to_str().unwrap(), + "97ce69d9-b4be-4241-8dbd-d265edcf06c4" + ); + + assert_eq!( + &parts.uri.path_and_query().unwrap().to_string(), + "/api/logs?api-version=2016-04-01" + ); +} + +#[test] +fn encode_valid() { + let mut log = [("message", "hello world")] + .iter() + .copied() + .collect::(); + let (timestamp_key, timestamp_value) = insert_timestamp_kv(&mut log); + + let event = Event::from(log); + let encoder = JsonEncoding::new(Default::default(), log_schema().timestamp_key().cloned()); + let mut encoded = vec![]; + encoder.encode_input(vec![event], &mut encoded).unwrap(); + let expected_json = serde_json::json!([{ + timestamp_key: timestamp_value, + "message": "hello world" + }]); + let json: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(json, expected_json); +}