From 68592e11ca33dd1c87d3d39293e9b2d570c24533 Mon Sep 17 00:00:00 2001 From: Timur Makarchuk Date: Sun, 11 Dec 2022 20:23:08 +0600 Subject: [PATCH 01/16] feat: migrate to `async_nats` client --- Cargo.lock | 171 +++++++++++++++--------------------- Cargo.toml | 6 +- src/internal_events/nats.rs | 8 +- src/nats.rs | 49 +++++------ src/sinks/nats.rs | 13 +-- src/sources/nats.rs | 34 +++---- 6 files changed, 116 insertions(+), 165 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f0821ae871f77..6302ef541a3cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -451,6 +451,40 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-nats" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2407f5e2adbf23226bbb3e20eaee46bfbf7b502a611bebb943641f85b3b5e94e" +dependencies = [ + "base64", + "base64-url", + "bytes 1.3.0", + "futures 0.3.25", + "http", + "itertools", + "itoa 1.0.4", + "lazy_static", + "nkeys", + "nuid", + "once_cell", + "regex", + "ring", + "rustls-native-certs", + "rustls-pemfile 1.0.1", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "subslice", + "time", + "tokio", + "tokio-retry", + "tokio-rustls", + "tracing 0.1.37", + "url", +] + [[package]] name = "async-net" version = "1.7.0" @@ -1334,8 +1368,8 @@ dependencies = [ "hyperlocal", "log", "pin-project-lite", - "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls", + "rustls-native-certs", "rustls-pemfile 1.0.1", "serde", "serde_derive", @@ -1345,7 +1379,7 @@ dependencies = [ "tokio", "tokio-util", "url", - "webpki 0.22.0", + "webpki", "webpki-roots", "winapi", ] @@ -3700,8 +3734,8 @@ dependencies = [ "http", "hyper", "log", - "rustls 0.20.7", - "rustls-native-certs 0.6.2", + "rustls", + "rustls-native-certs", "tokio", "tokio-rustls", ] @@ -4007,12 +4041,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" - [[package]] name = "json-patch" version = "0.2.6" @@ -4752,7 +4780,7 @@ dependencies = [ "percent-encoding", "rand 0.8.5", "rustc_version_runtime", - "rustls 0.20.7", + "rustls", "rustls-pemfile 0.3.0", "serde", "serde_bytes", @@ -4816,42 +4844,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "nats" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c" -dependencies = [ - "base64", - "base64-url", - "blocking", - "crossbeam-channel", - "fastrand", - "itoa 1.0.4", - "json", - "lazy_static", - "libc", - "log", - "memchr", - "nkeys", - "nuid", - "once_cell", - "parking_lot", - "regex", - "ring", - "rustls 0.19.1", - "rustls-native-certs 0.5.0", - "rustls-pemfile 0.2.1", - "serde", - "serde_json", - "serde_nanos", - "serde_repr", - "time", - "url", - "webpki 0.21.4", - "winapi", -] - [[package]] name = "ndarray" version = "0.15.6" @@ -6426,7 +6418,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.20.7", + "rustls", "rustls-pemfile 1.0.1", "serde", "serde_json", @@ -6652,19 +6644,6 @@ dependencies = [ "windows-sys 0.42.0", ] -[[package]] -name = "rustls" -version = "0.19.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" -dependencies = [ - "base64", - "log", - "ring", - "sct 0.6.1", - "webpki 0.21.4", -] - [[package]] name = "rustls" version = "0.20.7" @@ -6673,20 +6652,8 @@ checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" dependencies = [ "log", "ring", - "sct 0.7.0", - "webpki 0.22.0", -] - -[[package]] -name = "rustls-native-certs" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" -dependencies = [ - "openssl-probe", - "rustls 0.19.1", - "schannel", - "security-framework", + "sct", + "webpki", ] [[package]] @@ -6857,16 +6824,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" -[[package]] -name = "sct" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "sct" version = "0.7.0" @@ -7547,6 +7504,15 @@ dependencies = [ "syn", ] +[[package]] +name = "subslice" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae" +dependencies = [ + "memchr", +] + [[package]] name = "subtle" version = "2.4.1" @@ -7946,15 +7912,26 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls 0.20.7", + "rustls", "tokio", - "webpki 0.22.0", + "webpki", ] [[package]] @@ -8002,7 +7979,7 @@ checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" dependencies = [ "futures-util", "log", - "rustls 0.20.7", + "rustls", "tokio", "tungstenite 0.18.0", ] @@ -8053,7 +8030,7 @@ dependencies = [ "pin-project", "prost", "prost-derive", - "rustls-native-certs 0.6.2", + "rustls-native-certs", "rustls-pemfile 1.0.1", "tokio", "tokio-rustls", @@ -8770,6 +8747,7 @@ dependencies = [ "async-compression", "async-graphql", "async-graphql-warp", + "async-nats", "async-stream", "async-trait", "atty", @@ -8859,7 +8837,6 @@ dependencies = [ "metrics-tracing-context", "mlua", "mongodb", - "nats", "nix 0.26.1", "nkeys", "nom", @@ -9601,16 +9578,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "webpki" -version = "0.21.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "webpki" version = "0.22.0" @@ -9627,7 +9594,7 @@ version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" dependencies = [ - "webpki 0.22.0", + "webpki", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 76c8bf52d68f6..9f1dbcc5bf084 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,7 +287,7 @@ lru = { version = "0.8.1", default-features = false, optional = true } maxminddb = { version = "0.23.0", default-features = false, optional = true } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.3.1", default-features = false, features = ["tokio-runtime"], optional = true } -nats = { version = "0.23.1", default-features = false, optional = true } +async-nats = { version = "0.24.0", default-features = false, optional = true } nkeys = { version = "0.2.0", default-features = false, optional = true } nom = { version = "7.1.1", default-features = false, optional = true } notify = { version = "5.0.0", default-features = false, features = ["macos_fsevent"] } @@ -537,7 +537,7 @@ sources-kafka = ["dep:rdkafka"] sources-kubernetes_logs = ["dep:file-source", "kubernetes", "transforms-reduce"] sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"] sources-mongodb_metrics = ["dep:mongodb"] -sources-nats = ["dep:nats", "dep:nkeys"] +sources-nats = ["dep:async-nats", "dep:nkeys"] sources-nginx_metrics = ["dep:nom"] sources-opentelemetry = ["dep:hex", "dep:opentelemetry-proto", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-vector"] sources-postgresql_metrics = ["dep:postgres-openssl", "dep:tokio-postgres"] @@ -690,7 +690,7 @@ sinks-influxdb = [] sinks-kafka = ["dep:rdkafka"] sinks-logdna = [] sinks-loki = ["loki-logproto"] -sinks-nats = ["dep:nats", "dep:nkeys"] +sinks-nats = ["dep:async-nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] diff --git a/src/internal_events/nats.rs b/src/internal_events/nats.rs index 94e06da098108..9a2940c1744ed 100644 --- a/src/internal_events/nats.rs +++ b/src/internal_events/nats.rs @@ -1,5 +1,3 @@ -use std::io::Error; - use crate::emit; use metrics::counter; use vector_common::internal_event::{ @@ -7,11 +5,9 @@ use vector_common::internal_event::{ }; use vector_core::internal_event::InternalEvent; -use super::prelude::io_error_code; - #[derive(Debug)] pub struct NatsEventSendError { - pub error: Error, + pub error: async_nats::PublishError, } impl InternalEvent for NatsEventSendError { @@ -21,14 +17,12 @@ impl InternalEvent for NatsEventSendError { message = reason, error = %self.error, error_type = error_type::WRITER_FAILED, - error_code = io_error_code(&self.error), stage = error_stage::SENDING, internal_log_rate_limit = true, ); counter!( "component_errors_total", 1, "error_type" => error_type::WRITER_FAILED, - "error_code" => io_error_code(&self.error), "stage" => error_stage::SENDING, ); emit!(ComponentEventsDropped:: { count: 1, reason }); diff --git a/src/nats.rs b/src/nats.rs index 7f2998dc44793..4def6c7ef4649 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -13,6 +13,8 @@ pub enum NatsConfigError { TlsMissingKey, #[snafu(display("NATS TLS Config Error: missing cert"))] TlsMissingCert, + #[snafu(display("NATS Credentials file error"))] + CredentialsFileError{source: std::io::Error} } /// Configuration of the authentication strategy when interacting with NATS. @@ -109,29 +111,27 @@ pub(crate) struct NatsAuthNKey { } impl NatsAuthConfig { - pub(crate) fn to_nats_options(&self) -> Result { + pub(crate) fn to_nats_options(&self) -> Result { match self { NatsAuthConfig::UserPassword { user_password } => { - Ok(nats::asynk::Options::with_user_pass( - user_password.user.as_str(), - user_password.password.inner(), + Ok(async_nats::ConnectOptions::with_user_and_password( + user_password.user.clone(), + user_password.password.inner().to_string(), )) } - NatsAuthConfig::CredentialsFile { credentials_file } => Ok( - nats::asynk::Options::with_credentials(&credentials_file.path), - ), + NatsAuthConfig::CredentialsFile { credentials_file } => { + async_nats::ConnectOptions::with_credentials( + &std::fs::read_to_string(credentials_file.path.clone()) + .context(CredentialsFileSnafu)? + ).context(CredentialsFileSnafu) + }, NatsAuthConfig::Nkey { nkey } => nkeys::KeyPair::from_seed(&nkey.seed) .context(AuthConfigSnafu) - .map(|kp| { - // The following unwrap is safe because the only way the sign method can fail is if - // keypair does not contain a seed. We are constructing the keypair from a seed in - // the preceding line. - nats::asynk::Options::with_nkey(&nkey.nkey, move |nonce| { - kp.sign(nonce).unwrap() - }) + .map(|_kp| { + async_nats::ConnectOptions::with_nkey(nkey.nkey.clone()) }), NatsAuthConfig::Token { token } => { - Ok(nats::asynk::Options::with_token(token.value.inner())) + Ok(async_nats::ConnectOptions::with_token(token.value.inner().to_string())) } } } @@ -141,35 +141,32 @@ pub(crate) fn from_tls_auth_config( connection_name: &str, auth_config: &Option, tls_config: &Option, -) -> Result { +) -> Result { let nats_options = match &auth_config { - None => nats::asynk::Options::new(), + None => async_nats::ConnectOptions::new(), Some(auth) => auth.to_nats_options()?, }; let nats_options = nats_options - .with_name(connection_name) - // Set reconnect_buffer_size on the nats client to 0 bytes so that the - // client doesn't buffer internally (to avoid message loss). - .reconnect_buffer_size(0); + .name(connection_name); match tls_config { None => Ok(nats_options), Some(tls_config) => { let tls_enabled = tls_config.enabled.unwrap_or(false); - let nats_options = nats_options.tls_required(tls_enabled); + let nats_options = nats_options.require_tls(tls_enabled); if !tls_enabled { return Ok(nats_options); } let nats_options = match &tls_config.options.ca_file { None => nats_options, - Some(ca_file) => nats_options.add_root_certificate(ca_file), + Some(ca_file) => nats_options.add_root_certificates(ca_file.clone()), }; let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) { (None, None) => nats_options, - (Some(crt_file), Some(key_file)) => nats_options.client_cert(crt_file, key_file), + (Some(crt_file), Some(key_file)) => nats_options.add_client_certificate(crt_file.clone(), key_file.clone()), (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey), (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert), }; @@ -182,7 +179,7 @@ pub(crate) fn from_tls_auth_config( mod tests { use super::*; - fn parse_auth(s: &str) -> Result { + fn parse_auth(s: &str) -> Result { toml::from_str(s) .map_err(Into::into) .and_then(|config: NatsAuthConfig| config.to_nats_options().map_err(Into::into)) @@ -260,7 +257,7 @@ mod tests { parse_auth( r#" strategy = "credentials_file" - credentials_file.path = "/path/to/nowhere" + credentials_file.path = "tests/data/nats/nats.creds" "#, ) .unwrap(); diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 0d1397803c399..c76a5c13207a7 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -115,7 +115,7 @@ impl SinkConfig for NatsSinkConfig { } } -impl std::convert::TryFrom<&NatsSinkConfig> for nats::asynk::Options { +impl std::convert::TryFrom<&NatsSinkConfig> for async_nats::ConnectOptions { type Error = NatsConfigError; fn try_from(config: &NatsSinkConfig) -> Result { @@ -124,8 +124,8 @@ impl std::convert::TryFrom<&NatsSinkConfig> for nats::asynk::Options { } impl NatsSinkConfig { - async fn connect(&self) -> Result { - let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; + async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; options.connect(&self.url).await.context(ConnectSnafu) } @@ -138,7 +138,7 @@ async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> { pub struct NatsSink { transformer: Transformer, encoder: Encoder<()>, - connection: nats::asynk::Connection, + connection: async_nats::Client, subject: Template, } @@ -191,7 +191,8 @@ impl StreamSink for NatsSink { continue; } - match self.connection.publish(&subject, &bytes).await { + let message_size = bytes.len(); + match self.connection.publish(subject.clone(), bytes.freeze()).await { Err(error) => { finalizers.update_status(EventStatus::Errored); @@ -201,7 +202,7 @@ impl StreamSink for NatsSink { finalizers.update_status(EventStatus::Delivered); events_sent.emit(CountByteSize(1, event_byte_size)); - bytes_sent.emit(ByteSize(bytes.len())); + bytes_sent.emit(ByteSize(message_size)); } } } diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 073fcf9700b48..83972db19c6b0 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -1,6 +1,6 @@ use chrono::Utc; use codecs::decoding::{DeserializerConfig, FramingConfig, StreamDecodingError}; -use futures::{pin_mut, stream, Stream, StreamExt}; +use futures::{pin_mut, StreamExt}; use snafu::{ResultExt, Snafu}; use tokio_util::codec::FramedRead; use vector_common::internal_event::{ @@ -28,7 +28,7 @@ enum BuildError { #[snafu(display("NATS Connect Error: {}", source))] Connect { source: std::io::Error }, #[snafu(display("NATS Subscribe Error: {}", source))] - Subscribe { source: std::io::Error }, + Subscribe { source: async_nats::Error }, } /// Configuration for the `nats` source. @@ -121,13 +121,13 @@ impl SourceConfig for NatsSourceConfig { } impl NatsSourceConfig { - async fn connect(&self) -> Result { - let options: nats::asynk::Options = self.try_into().context(ConfigSnafu)?; + async fn connect(&self) -> Result { + let options: async_nats::ConnectOptions = self.try_into().context(ConfigSnafu)?; options.connect(&self.url).await.context(ConnectSnafu) } } -impl std::convert::TryFrom<&NatsSourceConfig> for nats::asynk::Options { +impl std::convert::TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions { type Error = NatsConfigError; fn try_from(config: &NatsSourceConfig) -> Result { @@ -135,29 +135,21 @@ impl std::convert::TryFrom<&NatsSourceConfig> for nats::asynk::Options { } } -fn get_subscription_stream( - subscription: nats::asynk::Subscription, -) -> impl Stream { - stream::unfold(subscription, |subscription| async move { - subscription.next().await.map(|msg| (msg, subscription)) - }) -} - async fn nats_source( // Take ownership of the connection so it doesn't get dropped. - _connection: nats::asynk::Connection, - subscription: nats::asynk::Subscription, + _connection: async_nats::Client, + subscriber: async_nats::Subscriber, decoder: Decoder, log_namespace: LogNamespace, shutdown: ShutdownSignal, mut out: SourceSender, ) -> Result<(), ()> { - let stream = get_subscription_stream(subscription).take_until(shutdown); + let stream = subscriber.take_until(shutdown); pin_mut!(stream); let bytes_received = register!(BytesReceived::from(Protocol::TCP)); while let Some(msg) = stream.next().await { - bytes_received.emit(ByteSize(msg.data.len())); - let mut stream = FramedRead::new(msg.data.as_ref(), decoder.clone()); + bytes_received.emit(ByteSize(msg.payload.len())); + let mut stream = FramedRead::new(msg.payload.as_ref(), decoder.clone()); while let Some(next) = stream.next().await { match next { Ok((events, _byte_size)) => { @@ -199,12 +191,12 @@ async fn nats_source( async fn create_subscription( config: &NatsSourceConfig, -) -> Result<(nats::asynk::Connection, nats::asynk::Subscription), BuildError> { +) -> Result<(async_nats::Client, async_nats::Subscriber), BuildError> { let nc = config.connect().await?; let subscription = match &config.queue { - None => nc.subscribe(&config.subject).await, - Some(queue) => nc.queue_subscribe(&config.subject, queue).await, + None => nc.subscribe(config.subject.clone()).await, + Some(queue) => nc.queue_subscribe(config.subject.clone(), queue.clone()).await, }; let subscription = subscription.context(SubscribeSnafu)?; From c9b320c4eab454ce3075b579b5e6d75b119d0c05 Mon Sep 17 00:00:00 2001 From: Timur Makarchuk Date: Tue, 13 Dec 2022 23:39:20 +0600 Subject: [PATCH 02/16] fix: port nats-integration-tests to async_nats --- src/nats.rs | 26 +++++++++++++------------- src/sinks/nats.rs | 14 +++++++++----- src/sources/nats.rs | 11 +++++++++-- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/nats.rs b/src/nats.rs index 4def6c7ef4649..8603fc68e1ec8 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -14,7 +14,7 @@ pub enum NatsConfigError { #[snafu(display("NATS TLS Config Error: missing cert"))] TlsMissingCert, #[snafu(display("NATS Credentials file error"))] - CredentialsFileError{source: std::io::Error} + CredentialsFileError { source: std::io::Error }, } /// Configuration of the authentication strategy when interacting with NATS. @@ -122,17 +122,16 @@ impl NatsAuthConfig { NatsAuthConfig::CredentialsFile { credentials_file } => { async_nats::ConnectOptions::with_credentials( &std::fs::read_to_string(credentials_file.path.clone()) - .context(CredentialsFileSnafu)? - ).context(CredentialsFileSnafu) - }, + .context(CredentialsFileSnafu)?, + ) + .context(CredentialsFileSnafu) + } NatsAuthConfig::Nkey { nkey } => nkeys::KeyPair::from_seed(&nkey.seed) .context(AuthConfigSnafu) - .map(|_kp| { - async_nats::ConnectOptions::with_nkey(nkey.nkey.clone()) - }), - NatsAuthConfig::Token { token } => { - Ok(async_nats::ConnectOptions::with_token(token.value.inner().to_string())) - } + .map(|_kp| async_nats::ConnectOptions::with_nkey(nkey.nkey.clone())), + NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token( + token.value.inner().to_string(), + )), } } } @@ -147,8 +146,7 @@ pub(crate) fn from_tls_auth_config( Some(auth) => auth.to_nats_options()?, }; - let nats_options = nats_options - .name(connection_name); + let nats_options = nats_options.name(connection_name); match tls_config { None => Ok(nats_options), @@ -166,7 +164,9 @@ pub(crate) fn from_tls_auth_config( let nats_options = match (&tls_config.options.crt_file, &tls_config.options.key_file) { (None, None) => nats_options, - (Some(crt_file), Some(key_file)) => nats_options.add_client_certificate(crt_file.clone(), key_file.clone()), + (Some(crt_file), Some(key_file)) => { + nats_options.add_client_certificate(crt_file.clone(), key_file.clone()) + } (Some(_crt_file), None) => return Err(NatsConfigError::TlsMissingKey), (None, Some(_key_file)) => return Err(NatsConfigError::TlsMissingCert), }; diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index c76a5c13207a7..86fe5f172bdb6 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -192,7 +192,11 @@ impl StreamSink for NatsSink { } let message_size = bytes.len(); - match self.connection.publish(subject.clone(), bytes.freeze()).await { + match self + .connection + .publish(subject.clone(), bytes.freeze()) + .await + { Err(error) => { finalizers.update_status(EventStatus::Errored); @@ -253,8 +257,8 @@ mod integration_tests { .connect() .await .expect("failed to connect with test consumer"); - let sub = consumer - .subscribe(&subject) + let mut sub = consumer + .subscribe(subject) .await .expect("failed to subscribe with test consumer"); @@ -266,11 +270,11 @@ mod integration_tests { // Unsubscribe from the channel. thread::sleep(Duration::from_secs(3)); - sub.drain().await.unwrap(); + sub.unsubscribe().await.unwrap(); let mut output: Vec = Vec::new(); while let Some(msg) = sub.next().await { - output.push(String::from_utf8_lossy(&msg.data).to_string()) + output.push(String::from_utf8_lossy(&msg.payload).to_string()) } assert_eq!(output.len(), input.len()); diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 83972db19c6b0..cc8d83ca83e69 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -196,7 +196,10 @@ async fn create_subscription( let subscription = match &config.queue { None => nc.subscribe(config.subject.clone()).await, - Some(queue) => nc.queue_subscribe(config.subject.clone(), queue.clone()).await, + Some(queue) => { + nc.queue_subscribe(config.subject.clone(), queue.clone()) + .await + } }; let subscription = subscription.context(SubscribeSnafu)?; @@ -273,6 +276,7 @@ mod tests { mod integration_tests { #![allow(clippy::print_stdout)] //tests + use bytes::Bytes; use vector_core::config::log_schema; use super::*; @@ -306,7 +310,10 @@ mod integration_tests { ShutdownSignal::noop(), tx, )); - nc_pub.publish(&subject, msg).await.unwrap(); + nc_pub + .publish(subject, Bytes::from_static(msg.as_bytes())) + .await + .unwrap(); collect_n(rx, 1).await }) From cc875069ccd7ed831e810f8cec571ba1ab04b39a Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Sun, 6 Aug 2023 15:50:38 +0200 Subject: [PATCH 03/16] Bump async-nats to v0.28 --- Cargo.lock | 42 ++++++++++++++++++++++++++---------------- Cargo.toml | 2 +- 2 files changed, 27 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca940e74adaf0..47d57c4240e84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,9 +648,9 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.24.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2407f5e2adbf23226bbb3e20eaee46bfbf7b502a611bebb943641f85b3b5e94e" +checksum = "4ddf2d540de89dc290f7abdd8a3889af74083eaf677ad845718542670d792847" dependencies = [ "base64 0.13.1", "base64-url", @@ -2827,6 +2827,15 @@ dependencies = [ "const-oid", ] +[[package]] +name = "deranged" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" +dependencies = [ + "serde", +] + [[package]] name = "derivative" version = "2.2.0" @@ -7334,11 +7343,11 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "1.0.1" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" dependencies = [ - "base64 0.13.1", + "base64 0.21.2", ] [[package]] @@ -7631,9 +7640,9 @@ dependencies = [ [[package]] name = "serde_nanos" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44969a61f5d316be20a42ff97816efb3b407a924d06824c3d8a49fa8450de0e" +checksum = "8ae801b7733ca8d6a2b580debe99f67f36826a0f5b8a36055dc6bc40f8d6bc71" dependencies = [ "serde", ] @@ -7660,13 +7669,13 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.9" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fe39d9fbb0ebf5eb2c7cb7e2a47e4f462fad1379f1166b8ae49ad9eae89a7ca" +checksum = "8725e1dfadb3a50f7e5ce0b1a540466f6ed3fe7a0fca2ac2b8b831d31316bd00" dependencies = [ "proc-macro2 1.0.66", "quote 1.0.32", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -8451,10 +8460,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.17" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" dependencies = [ + "deranged", "itoa", "libc", "num_threads", @@ -8465,15 +8475,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.6" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd" dependencies = [ "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 671677c89b9c3..bab3d9787eb90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,7 +287,7 @@ lru = { version = "0.11.0", default-features = false, optional = true } maxminddb = { version = "0.23.0", default-features = false, optional = true } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.6.0", default-features = false, features = ["tokio-runtime"], optional = true } -async-nats = { version = "0.24.0", default-features = false, optional = true } +async-nats = { version = "0.28.0", default-features = false, optional = true } nkeys = { version = "0.3.1", default-features = false, optional = true } nom = { version = "7.1.3", default-features = false, optional = true } notify = { version = "6.0.1", default-features = false, features = ["macos_fsevent"] } From 6c2212bc6a9cf6e16ac58c3faecff296afb67ad2 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Sun, 6 Aug 2023 16:38:38 +0200 Subject: [PATCH 04/16] Bump to latest git revision --- Cargo.lock | 76 +++++++++++++-------------------------------- Cargo.toml | 2 +- src/sinks/nats.rs | 2 +- src/sources/nats.rs | 4 +-- 4 files changed, 25 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47d57c4240e84..5be6fff25d4ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,34 +648,33 @@ dependencies = [ [[package]] name = "async-nats" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ddf2d540de89dc290f7abdd8a3889af74083eaf677ad845718542670d792847" +version = "0.31.0" +source = "git+https://github.com/nats-io/nats.rs.git?rev=a59b21eabceef45736f6f7ca987987e8c3edb805#a59b21eabceef45736f6f7ca987987e8c3edb805" dependencies = [ - "base64 0.13.1", - "base64-url", + "base64 0.21.2", "bytes 1.4.0", "futures 0.3.28", "http", - "itertools 0.10.5", "itoa", - "lazy_static", - "nkeys 0.2.0", + "memchr", + "nkeys", "nuid", "once_cell", + "rand 0.8.5", "regex", "ring", "rustls-native-certs", "rustls-pemfile", + "rustls-webpki", "serde", "serde_json", "serde_nanos", "serde_repr", - "subslice", + "thiserror", "time", "tokio", "tokio-retry", - "tokio-rustls 0.23.4", + "tokio-rustls 0.24.0", "tracing 0.1.37", "url", ] @@ -1499,15 +1498,6 @@ dependencies = [ "simd-abstraction", ] -[[package]] -name = "base64-url" -version = "1.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67a99c239d0c7e77c85dddfa9cebce48704b3c49550fcd3b84dd637e4484899f" -dependencies = [ - "base64 0.13.1", -] - [[package]] name = "base64ct" version = "1.5.3" @@ -4236,7 +4226,7 @@ checksum = "0646026eb1b3eea4cd9ba47912ea5ce9cc07713d105b1a14698f4e6433d348b7" dependencies = [ "http", "hyper", - "rustls 0.21.0", + "rustls 0.21.6", "tokio", "tokio-rustls 0.24.0", ] @@ -5436,21 +5426,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "nkeys" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e66a7cd1358277b2a6f77078e70aea7315ff2f20db969cc61153103ec162594" -dependencies = [ - "byteorder", - "data-encoding", - "ed25519-dalek", - "getrandom 0.2.10", - "log", - "rand 0.8.5", - "signatory", -] - [[package]] name = "nkeys" version = "0.3.1" @@ -5545,11 +5520,11 @@ dependencies = [ [[package]] name = "nuid" -version = "0.3.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38" +checksum = "0b61b1710432e483e6a67b20b6c60c6afe0e2fad67aabba3bdb912f3f70ff6ae" dependencies = [ - "lazy_static", + "once_cell", "rand 0.8.5", ] @@ -7075,7 +7050,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.0", + "rustls 0.21.6", "rustls-pemfile", "serde", "serde_json", @@ -7319,9 +7294,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.0" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07180898a28ed6a7f7ba2311594308f595e3dd2e3c3812fa0a80a47b45f17e5d" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" dependencies = [ "log", "ring", @@ -7352,9 +7327,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.100.1" +version = "0.101.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" +checksum = "513722fd73ad80a71f72b61009ea1b584bcfa1483ca93949c8f290298837fa59" dependencies = [ "ring", "untrusted", @@ -8199,15 +8174,6 @@ dependencies = [ "syn 2.0.28", ] -[[package]] -name = "subslice" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae" -dependencies = [ - "memchr", -] - [[package]] name = "subtle" version = "2.4.1" @@ -8649,7 +8615,7 @@ version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" dependencies = [ - "rustls 0.21.0", + "rustls 0.21.6", "tokio", ] @@ -8698,7 +8664,7 @@ checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", - "rustls 0.21.0", + "rustls 0.21.6", "tokio", "tungstenite 0.20.0", ] @@ -9546,7 +9512,7 @@ dependencies = [ "mlua", "mongodb", "nix 0.26.2", - "nkeys 0.3.1", + "nkeys", "nom", "notify", "num-format", diff --git a/Cargo.toml b/Cargo.toml index bab3d9787eb90..efce0d580245f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,7 +287,7 @@ lru = { version = "0.11.0", default-features = false, optional = true } maxminddb = { version = "0.23.0", default-features = false, optional = true } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.6.0", default-features = false, features = ["tokio-runtime"], optional = true } -async-nats = { version = "0.28.0", default-features = false, optional = true } +async-nats = { git = "https://github.com/nats-io/nats.rs.git", rev = "a59b21eabceef45736f6f7ca987987e8c3edb805", default-features = false, optional = true } nkeys = { version = "0.3.1", default-features = false, optional = true } nom = { version = "7.1.3", default-features = false, optional = true } notify = { version = "6.0.1", default-features = false, features = ["macos_fsevent"] } diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index f8e93f743553d..3b2c076f6eb65 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -33,7 +33,7 @@ enum BuildError { #[snafu(display("NATS Config Error: {}", source))] Config { source: NatsConfigError }, #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: std::io::Error }, + Connect { source: async_nats::ConnectError }, } /** diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 1352d020bc93f..0d6d37f2e7694 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -31,9 +31,9 @@ enum BuildError { #[snafu(display("NATS Config Error: {}", source))] Config { source: NatsConfigError }, #[snafu(display("NATS Connect Error: {}", source))] - Connect { source: std::io::Error }, + Connect { source: async_nats::ConnectError }, #[snafu(display("NATS Subscribe Error: {}", source))] - Subscribe { source: async_nats::Error }, + Subscribe { source: async_nats::SubscribeError }, } /// Configuration for the `nats` source. From dd9e3b72a0deb90743829970e5bc0482277bba37 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Mon, 7 Aug 2023 17:22:39 +0200 Subject: [PATCH 05/16] cargo vdev build licenses --- LICENSE-3rdparty.csv | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 8c11eae6a04f2..86e67efe0648f 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -35,6 +35,7 @@ async-global-executor,https://github.com/Keruspe/async-global-executor,Apache-2. async-graphql,https://github.com/async-graphql/async-graphql,MIT OR Apache-2.0,"sunli , Koxiaet" async-io,https://github.com/smol-rs/async-io,Apache-2.0 OR MIT,Stjepan Glavina async-lock,https://github.com/smol-rs/async-lock,Apache-2.0 OR MIT,Stjepan Glavina +async-nats,https://github.com/nats-io/nats.rs,Apache-2.0,"Tomasz Pietrek , Casper Beyer " async-net,https://github.com/smol-rs/async-net,Apache-2.0 OR MIT,Stjepan Glavina async-process,https://github.com/smol-rs/async-process,Apache-2.0 OR MIT,Stjepan Glavina async-reactor-trait,https://github.com/amqp-rs/reactor-trait,Apache-2.0 OR MIT,Marc-Antoine Perennou @@ -82,7 +83,6 @@ backtrace,https://github.com/rust-lang/backtrace-rs,MIT OR Apache-2.0,The Rust P base16,https://github.com/thomcc/rust-base16,CC0-1.0,Thom Chiovoloni base64,https://github.com/marshallpierce/rust-base64,MIT OR Apache-2.0,"Alice Maz , Marshall Pierce " base64-simd,https://github.com/Nugine/simd,MIT,The base64-simd Authors -base64-url,https://github.com/magiclen/base64-url,MIT,Magic Len base64ct,https://github.com/RustCrypto/formats/tree/master/base64ct,Apache-2.0 OR MIT,RustCrypto Developers bit-set,https://github.com/contain-rs/bit-set,MIT OR Apache-2.0,Alexis Beingessner bit-vec,https://github.com/contain-rs/bit-vec,MIT OR Apache-2.0,Alexis Beingessner @@ -143,7 +143,6 @@ crc,https://github.com/mrhooray/crc-rs,MIT OR Apache-2.0,"Rui Hu crc32c,https://github.com/zowens/crc32c,Apache-2.0 OR MIT,Zack Owens crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs , Alex Crichton " -crossbeam-channel,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-channel Authors crossbeam-epoch,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-epoch Authors crossbeam-queue,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-queue Authors crossbeam-utils,https://github.com/crossbeam-rs/crossbeam,MIT OR Apache-2.0,The crossbeam-utils Authors @@ -163,6 +162,7 @@ data-encoding,https://github.com/ia0/data-encoding,MIT,Julien Cretin debug-helper,https://github.com/magiclen/debug-helper,MIT,Magic Len der,https://github.com/RustCrypto/formats/tree/master/der,Apache-2.0 OR MIT,RustCrypto Developers +deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt derivative,https://github.com/mcarton/rust-derivative,MIT OR Apache-2.0,mcarton derive_arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuzz Project Developers, Nick Fitzgerald , Manish Goregaokar , Andre Bogus , Corey Farwell " derive_more,https://github.com/JelteF/derive_more,MIT,Jelte Fennema @@ -283,7 +283,6 @@ itoa,https://github.com/dtolnay/itoa,MIT OR Apache-2.0,David Tolnay jni-sys,https://github.com/sfackler/rust-jni-sys,MIT OR Apache-2.0,Steven Fackler js-sys,https://github.com/rustwasm/wasm-bindgen/tree/master/crates/js-sys,MIT OR Apache-2.0,The wasm-bindgen Developers -json,https://github.com/maciejhirsz/json-rust,MIT OR Apache-2.0,Maciej Hirsz json-patch,https://github.com/idubrov/json-patch,MIT OR Apache-2.0,Ivan Dubrov jsonpath_lib,https://github.com/freestrings/jsonpath,MIT,Changseok Han k8s-openapi,https://github.com/Arnavion/k8s-openapi,Apache-2.0,Arnavion @@ -336,7 +335,6 @@ mlua,https://github.com/khvzak/mlua,MIT,"Aleksandr Orlenko , kyren mongodb,https://github.com/mongodb/mongo-rust-driver,Apache-2.0,"Saghm Rossi , Patrick Freed , Isabel Atkinson , Abraham Egnor , Kaitlin Mahar " multer,https://github.com/rousan/multer-rs,MIT,Rousan Ali native-tls,https://github.com/sfackler/rust-native-tls,MIT OR Apache-2.0,Steven Fackler -nats,https://github.com/nats-io/nats.rs,Apache-2.0,"Derek Collison , Tyler Neely , Stjepan Glavina " ndk-context,https://github.com/rust-windowing/android-ndk-rs,MIT OR Apache-2.0,The Rust Windowing contributors nibble_vec,https://github.com/michaelsproul/rust_nibble_vec,MIT,Michael Sproul nix,https://github.com/nix-rust/nix,MIT,The nix-rust Project Developers @@ -456,10 +454,8 @@ rustc-hash,https://github.com/rust-lang-nursery/rustc-hash,Apache-2.0 OR MIT,The rustc_version,https://github.com/Kimundi/rustc-version-rs,MIT OR Apache-2.0,Marvin Löbel rustc_version_runtime,https://github.com/seppo0010/rustc-version-runtime-rs,MIT,Sebastian Waisbrot rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman , Jakub Konka " -rustls,https://github.com/ctz/rustls,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors rustls-native-certs,https://github.com/ctz/rustls-native-certs,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton -rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,Joseph Birr-Pixton rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,The rustls-pemfile Authors rustls-webpki,https://github.com/rustls/webpki,ISC,The rustls-webpki Authors rustversion,https://github.com/dtolnay/rustversion,MIT OR Apache-2.0,David Tolnay From 4f52c6dafa42a9527eca04dc79607267b3d19c31 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Tue, 8 Aug 2023 18:06:06 +0200 Subject: [PATCH 06/16] Use 0.31.0 for now --- Cargo.lock | 9 +++++---- Cargo.toml | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5be6fff25d4ce..4f359641f93c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -649,7 +649,8 @@ dependencies = [ [[package]] name = "async-nats" version = "0.31.0" -source = "git+https://github.com/nats-io/nats.rs.git?rev=a59b21eabceef45736f6f7ca987987e8c3edb805#a59b21eabceef45736f6f7ca987987e8c3edb805" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8257238e2a3629ee5618502a75d1b91f8017c24638c75349fc8d2d80cf1f7c4c" dependencies = [ "base64 0.21.2", "bytes 1.4.0", @@ -5520,11 +5521,11 @@ dependencies = [ [[package]] name = "nuid" -version = "0.4.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b61b1710432e483e6a67b20b6c60c6afe0e2fad67aabba3bdb912f3f70ff6ae" +checksum = "20c1bb65186718d348306bf1afdeb20d9ab45b2ab80fb793c0fdcf59ffbb4f38" dependencies = [ - "once_cell", + "lazy_static", "rand 0.8.5", ] diff --git a/Cargo.toml b/Cargo.toml index efce0d580245f..356dfb6bdd082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -287,7 +287,7 @@ lru = { version = "0.11.0", default-features = false, optional = true } maxminddb = { version = "0.23.0", default-features = false, optional = true } md-5 = { version = "0.10", default-features = false, optional = true } mongodb = { version = "2.6.0", default-features = false, features = ["tokio-runtime"], optional = true } -async-nats = { git = "https://github.com/nats-io/nats.rs.git", rev = "a59b21eabceef45736f6f7ca987987e8c3edb805", default-features = false, optional = true } +async-nats = { version = "0.31.0", default-features = false, optional = true } nkeys = { version = "0.3.1", default-features = false, optional = true } nom = { version = "7.1.3", default-features = false, optional = true } notify = { version = "6.0.1", default-features = false, features = ["macos_fsevent"] } From 365d10a66bfd043d67c56906d0f8dbe8ca26786b Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 9 Aug 2023 14:45:07 +0200 Subject: [PATCH 07/16] bump client_capacity to 1024 --- src/nats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nats.rs b/src/nats.rs index f9b3246455005..0a01d62cfd799 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -152,7 +152,7 @@ pub(crate) fn from_tls_auth_config( Some(auth) => auth.to_nats_options()?, }; - let nats_options = nats_options.name(connection_name); + let nats_options = nats_options.client_capacity(1024).name(connection_name); match tls_config { None => Ok(nats_options), From b1fe741c55f586a6f405a0035006b18ad11b9743 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 9 Aug 2023 15:05:13 +0200 Subject: [PATCH 08/16] Replace std::thread::sleep with tokio::time::sleep --- src/sinks/nats.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 3b2c076f6eb65..a190d54d914eb 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -251,7 +251,7 @@ mod tests { #[cfg(test)] mod integration_tests { use codecs::TextSerializerConfig; - use std::{thread, time::Duration}; + use std::time::Duration; use super::*; use crate::nats::{NatsAuthCredentialsFile, NatsAuthNKey, NatsAuthToken, NatsAuthUserPassword}; @@ -291,7 +291,7 @@ mod integration_tests { run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await; // Unsubscribe from the channel. - thread::sleep(Duration::from_secs(3)); + tokio::time::sleep(Duration::from_secs(3)).await; sub.unsubscribe().await.unwrap(); let mut output: Vec = Vec::new(); From f30a61a7eca01449c66061334fc2239f021a8309 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 9 Aug 2023 15:14:26 +0200 Subject: [PATCH 09/16] Revert "bump client_capacity to 1024" This reverts commit 365d10a66bfd043d67c56906d0f8dbe8ca26786b. --- src/nats.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/nats.rs b/src/nats.rs index 0a01d62cfd799..f9b3246455005 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -152,7 +152,7 @@ pub(crate) fn from_tls_auth_config( Some(auth) => auth.to_nats_options()?, }; - let nats_options = nats_options.client_capacity(1024).name(connection_name); + let nats_options = nats_options.name(connection_name); match tls_config { None => Ok(nats_options), From c94f1a1e1ec2a8315838fb568639fb4066b4beb5 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 9 Aug 2023 15:46:24 +0200 Subject: [PATCH 10/16] Flush the connection before finishing NatsSink --- src/sinks/nats.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index a190d54d914eb..469e7a52c86c1 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -233,6 +233,8 @@ impl StreamSink for NatsSink { } } + let _ = self.connection.flush().await; + Ok(()) } } From 2f8987002a5f3841c7d2e66648844c706852bebd Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Wed, 9 Aug 2023 15:51:28 +0200 Subject: [PATCH 11/16] Flush subscribe before proceeding with the test --- src/sinks/nats.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 469e7a52c86c1..b7ead82415384 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -285,6 +285,7 @@ mod integration_tests { .subscribe(subject) .await .expect("failed to subscribe with test consumer"); + consumer.flush().await.expect("failed to flush with the test consumer"); // Publish events. let num_events = 1_000; From 27dbdf0c0ca609394aeb665b99211638e19e0ee3 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 10 Aug 2023 08:50:59 +0200 Subject: [PATCH 12/16] Fix NATS NKey auth Port to async-nats was using NKey public key when calling `with_nkey`. It should use the `seed`, which will be used to generate key pair. Signed-off-by: Tomasz Pietrek --- src/nats.rs | 5 ++--- src/sinks/nats.rs | 2 +- src/sources/nats.rs | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/nats.rs b/src/nats.rs index f9b3246455005..ddac306355274 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -132,9 +132,8 @@ impl NatsAuthConfig { ) .context(CredentialsFileSnafu) } - NatsAuthConfig::Nkey { nkey } => nkeys::KeyPair::from_seed(&nkey.seed) - .context(AuthConfigSnafu) - .map(|_kp| async_nats::ConnectOptions::with_nkey(nkey.nkey.clone())), + NatsAuthConfig::Nkey { nkey } => + Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone())), NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token( token.value.inner().to_string(), )), diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index b7ead82415384..64099bc93f2c6 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -509,7 +509,7 @@ mod integration_tests { let r = publish_and_check(conf).await; assert!( - matches!(r, Err(BuildError::Config { .. })), + matches!(r, Err(BuildError::Connect { .. })), "publish_and_check failed, expected BuildError::Config, got: {:?}", r ); diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 0d6d37f2e7694..2549a4771e40f 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -609,7 +609,7 @@ mod integration_tests { let r = publish_and_check(conf).await; assert!( - matches!(r, Err(BuildError::Config { .. })), + matches!(r, Err(BuildError::Connect { .. })), "publish_and_check failed, expected BuildError::Config, got: {:?}", r ); From 3fba0d557d1a9c6cf75d7695b80f7ba86609aeb9 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 11 Aug 2023 09:15:32 +0200 Subject: [PATCH 13/16] Flush after every publish --- src/internal_events/nats.rs | 2 +- src/sinks/nats.rs | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/internal_events/nats.rs b/src/internal_events/nats.rs index 9a2940c1744ed..c68c756a3a377 100644 --- a/src/internal_events/nats.rs +++ b/src/internal_events/nats.rs @@ -7,7 +7,7 @@ use vector_core::internal_event::InternalEvent; #[derive(Debug)] pub struct NatsEventSendError { - pub error: async_nats::PublishError, + pub error: async_nats::Error, } impl InternalEvent for NatsEventSendError { diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 64099bc93f2c6..02265df21972b 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -217,6 +217,8 @@ impl StreamSink for NatsSink { match self .connection .publish(subject.clone(), bytes.freeze()) + .map_err(Into::into) + .and_then(|_| self.connection.flush().map_err(Into::into)) .await { Err(error) => { @@ -233,8 +235,6 @@ impl StreamSink for NatsSink { } } - let _ = self.connection.flush().await; - Ok(()) } } @@ -285,7 +285,10 @@ mod integration_tests { .subscribe(subject) .await .expect("failed to subscribe with test consumer"); - consumer.flush().await.expect("failed to flush with the test consumer"); + consumer + .flush() + .await + .expect("failed to flush with the test consumer"); // Publish events. let num_events = 1_000; From 53c6e6bd6abf055a1188fb336f9dc5fb33b65053 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 11 Aug 2023 09:26:13 +0200 Subject: [PATCH 14/16] fmt --- src/nats.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/nats.rs b/src/nats.rs index ddac306355274..5d20516d49939 100644 --- a/src/nats.rs +++ b/src/nats.rs @@ -132,8 +132,9 @@ impl NatsAuthConfig { ) .context(CredentialsFileSnafu) } - NatsAuthConfig::Nkey { nkey } => - Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone())), + NatsAuthConfig::Nkey { nkey } => { + Ok(async_nats::ConnectOptions::with_nkey(nkey.seed.clone())) + } NatsAuthConfig::Token { token } => Ok(async_nats::ConnectOptions::with_token( token.value.inner().to_string(), )), From 59b070966cc809be9f049ac81e09ddc5d4a490fc Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 11 Aug 2023 14:05:40 +0200 Subject: [PATCH 15/16] downgrade time to 0.3.23 --- Cargo.lock | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b37d378bd431..fe7e40d8c994b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2817,15 +2817,6 @@ dependencies = [ "const-oid", ] -[[package]] -name = "deranged" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7684a49fb1af197853ef7b2ee694bc1f5b4179556f1e5710e1760c5db6f5e929" -dependencies = [ - "serde", -] - [[package]] name = "derivative" version = "2.2.0" @@ -8418,11 +8409,10 @@ dependencies = [ [[package]] name = "time" -version = "0.3.25" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fdd63d58b18d663fbdf70e049f00a22c8e42be082203be7f26589213cd75ea" +checksum = "59e399c068f43a5d116fedaf73b203fa4f9c519f17e2b34f63221d3792f81446" dependencies = [ - "deranged", "itoa", "libc", "num_threads", @@ -8439,9 +8429,9 @@ checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.11" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb71511c991639bb078fd5bf97757e03914361c48100d52878b8e52b46fb92cd" +checksum = "96ba15a897f3c86766b757e5ac7221554c6750054d74d5b28844fce5fb36a6c4" dependencies = [ "time-core", ] From 5ca559fe4a149294127af0d52ef6989968426156 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Fri, 11 Aug 2023 14:43:33 +0200 Subject: [PATCH 16/16] cargo vdev build licenses --- LICENSE-3rdparty.csv | 1 - 1 file changed, 1 deletion(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index b552db03d53f1..c99a0ece7a912 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -162,7 +162,6 @@ data-encoding,https://github.com/ia0/data-encoding,MIT,Julien Cretin debug-helper,https://github.com/magiclen/debug-helper,MIT,Magic Len der,https://github.com/RustCrypto/formats/tree/master/der,Apache-2.0 OR MIT,RustCrypto Developers -deranged,https://github.com/jhpratt/deranged,MIT OR Apache-2.0,Jacob Pratt derivative,https://github.com/mcarton/rust-derivative,MIT OR Apache-2.0,mcarton derive_arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuzz Project Developers, Nick Fitzgerald , Manish Goregaokar , Andre Bogus , Corey Farwell " derive_more,https://github.com/JelteF/derive_more,MIT,Jelte Fennema