diff --git a/Cargo.lock b/Cargo.lock index a5f635dafc5cf..ed27bc0aa8862 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -637,6 +637,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "awaitable" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70af449c9a763cb655c6a1e5338b42d99c67190824ff90658c1e30be844c0775" +dependencies = [ + "awaitable-error", + "cfg-if", +] + +[[package]] +name = "awaitable-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5b3469636cdf8543cceab175efca534471f36eee12fb8374aba00eb5e7e7f8a" + [[package]] name = "aws-config" version = "0.54.1" @@ -2054,6 +2070,17 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "concurrent_arena" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c529c2d4ecc249ae15d317c9a8b9e7d86f87e80d4417de6cfa8f4d6030f37daf" +dependencies = [ + "arc-swap", + "parking_lot", + "triomphe", +] + [[package]] name = "confy" version = "0.5.1" @@ -2645,6 +2672,17 @@ dependencies = [ "syn 2.0.27", ] +[[package]] +name = "derive_destructure2" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35cb7e5875e1028a73e551747d6d0118f25c3d6dbba2dadf97cc0f4d0c53f2f5" +dependencies = [ + "proc-macro2 1.0.66", + "quote 1.0.32", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2717,6 +2755,15 @@ dependencies = [ "dirs-sys 0.3.7", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys 0.4.1", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -5401,6 +5448,17 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2 1.0.66", + "quote 1.0.32", + "syn 1.0.109", +] + [[package]] name = "num-format" version = "0.4.4" @@ -5610,6 +5668,7 @@ dependencies = [ "base64 0.21.2", "bytes 1.4.0", "chrono", + "dirs 5.0.1", "flagset", "futures 0.3.28", "http", @@ -5617,6 +5676,8 @@ dependencies = [ "log", "md-5", "once_cell", + "openssh", + "openssh-sftp-client", "parking_lot", "percent-encoding", "pin-project", @@ -5652,6 +5713,100 @@ dependencies = [ "url", ] +[[package]] +name = "openssh" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca6c277973fb549b36dd8980941b5ea3ecebea026f5b1f0060acde74d893c22" +dependencies = [ + "dirs 4.0.0", + "libc", + "once_cell", + "shell-escape", + "tempfile", + "thiserror", + "tokio", + "tokio-pipe", +] + +[[package]] +name = "openssh-sftp-client" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a88d72ccea61738948673f736b3c7f0abef0c268e61622ef61f069c963aba46" +dependencies = [ + "bytes 1.4.0", + "derive_destructure2", + "futures-core", + "once_cell", + "openssh", + "openssh-sftp-client-lowlevel", + "openssh-sftp-error", + "pin-project", + "scopeguard", + "tokio", + "tokio-io-utility", + "tokio-util", + "tracing 0.1.37", +] + +[[package]] +name = "openssh-sftp-client-lowlevel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4975d0a824e82d4f61e3edf870254ce97bd7f8154751d2afdd97c7f43e57dff" +dependencies = [ + "awaitable", + "bytes 1.4.0", + "concurrent_arena", + "derive_destructure2", + "openssh-sftp-error", + "openssh-sftp-protocol", + "pin-project", + "tokio", + "tokio-io-utility", +] + +[[package]] +name = "openssh-sftp-error" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c3356e914b8006417188efd534105d5bcb230b4a9fd67782a6b4a4e15fa006" +dependencies = [ + "awaitable-error", + "openssh", + "openssh-sftp-protocol-error", + "ssh_format_error", + "thiserror", + "tokio", +] + +[[package]] +name = "openssh-sftp-protocol" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf38532d784978966f95d241226223823f351d5bb2a4bebcf6b20b9cb1e393e0" +dependencies = [ + "bitflags 2.3.2", + "num-derive", + "num-traits", + "openssh-sftp-protocol-error", + "serde", + "ssh_format", + "vec-strings", +] + +[[package]] +name = "openssh-sftp-protocol-error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0719269eb3f037866ae07ec89cb44ed2c1d63b72b2390cef8e1aa3016a956ff8" +dependencies = [ + "serde", + "thiserror", + "vec-strings", +] + [[package]] name = "openssl" version = "0.10.55" @@ -7715,6 +7870,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "shell-escape" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" + [[package]] name = "signal-hook" version = "0.3.14" @@ -7936,6 +8097,32 @@ dependencies = [ "der", ] +[[package]] +name = "ssh_format" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ab31081d1c9097c327ec23550858cb5ffb4af6b866c1ef4d728455f01f3304" +dependencies = [ + "bytes 1.4.0", + "serde", + "ssh_format_error", +] + +[[package]] +name = "ssh_format_error" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be3c6519de7ca611f71ef7e8a56eb57aa1c818fecb5242d0a0f39c83776c210c" +dependencies = [ + "serde", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -8246,6 +8433,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "thin-vec" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8" + [[package]] name = "thiserror" version = "1.0.44" @@ -8401,6 +8594,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-io-utility" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d672654d175710e52c7c41f6aec77c62b3c0954e2a7ebce9049d1e94ed7c263" +dependencies = [ + "bytes 1.4.0", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -8434,6 +8637,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-pipe" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f213a84bffbd61b8fa0ba8a044b4bbe35d471d0b518867181e82bd5c15542784" +dependencies = [ + "libc", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.7" @@ -8860,6 +9073,17 @@ dependencies = [ "serde_json", ] +[[package]] +name = "triomphe" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee8098afad3fb0c54a9007aab6804558410503ad676d4633f9c2559a00ac0f" +dependencies = [ + "arc-swap", + "serde", + "stable_deref_trait", +] + [[package]] name = "trust-dns-proto" version = "0.21.2" @@ -9255,6 +9479,16 @@ dependencies = [ "toml 0.7.6", ] +[[package]] +name = "vec-strings" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8509489e2a7ee219522238ad45fd370bec6808811ac15ac6b07453804e77659" +dependencies = [ + "serde", + "thin-vec", +] + [[package]] name = "vec_map" version = "0.8.2" @@ -10038,7 +10272,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "579cc485bd5ce5bfa0d738e4921dd0b956eca9800be1fd2e5257ebe95bc4617e" dependencies = [ "core-foundation", - "dirs", + "dirs 4.0.0", "jni", "log", "ndk-context", diff --git a/Cargo.toml b/Cargo.toml index dc33ef1b6b109..62c32f275bfa3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ futures = { version = "0.3.28", default-features = false, features = ["compat", tokio = { version = "1.29.1", default-features = false, features = ["full"] } tokio-openssl = { version = "0.6.3", default-features = false } tokio-stream = { version = "0.1.14", default-features = false, features = ["net", "sync", "time"] } -tokio-util = { version = "0.7", default-features = false, features = ["io", "time"] } +tokio-util = { version = "0.7.8", default-features = false, features = ["io", "time"] } console-subscriber = { version = "0.1.10", default-features = false, optional = true } # Tracing @@ -645,6 +645,7 @@ sinks-logs = [ "sinks-vector", "sinks-webhdfs", "sinks-websocket", + "sinks-sftp", ] sinks-metrics = [ "sinks-appsignal", @@ -709,6 +710,7 @@ sinks-utils-udp = [] sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build"] sinks-websocket = ["dep:tokio-tungstenite"] sinks-webhdfs = ["dep:opendal"] +sinks-sftp = ["dep:opendal", "opendal?/services-sftp"] # Datadog integration enterprise = [ diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index d349167f24f06..043a9e700e874 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -87,6 +87,8 @@ pub mod redis; pub mod s3_common; #[cfg(feature = "sinks-sematext")] pub mod sematext; +#[cfg(feature = "sinks-sftp")] +pub mod sftp; #[cfg(feature = "sinks-socket")] pub mod socket; #[cfg(feature = "sinks-splunk_hec")] diff --git a/src/sinks/sftp/config.rs b/src/sinks/sftp/config.rs new file mode 100644 index 0000000000000..63837e35f6809 --- /dev/null +++ b/src/sinks/sftp/config.rs @@ -0,0 +1,186 @@ +use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; +use opendal::{layers::LoggingLayer, services::Sftp, Operator}; +use tower::ServiceBuilder; +use vector_config::configurable_component; +use vector_core::{ + config::{AcknowledgementsConfig, DataType, Input}, + sink::VectorSink, +}; + +use crate::{ + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, + config::{GenerateConfig, SinkConfig, SinkContext}, + sinks::{ + opendal_common::*, + util::{ + partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings, + Compression, + }, + Healthcheck, + }, +}; + +/// Configuration for the `sftp` sink. +#[configurable_component(sink("sftp", "Sftp."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct SftpConfig { + /// The root path for Sftp. + /// + /// Must be a valid directory. + /// + /// The final file path is in the format of `{root}/{prefix}{suffix}`. + #[serde(default)] + pub root: String, + + /// A prefix to apply to all keys. + /// + /// Prefixes are useful for partitioning objects, such as by creating a blob key that + /// stores blobs under a particular directory. If using a prefix for this purpose, it must end + /// in `/` to act as a directory path. A trailing `/` is **not** automatically added. + /// + /// The final file path is in the format of `{root}/{prefix}{suffix}`. + #[serde(default)] + #[configurable(metadata(docs::templateable))] + pub prefix: String, + + /// The endpoint to connect to sftp. + #[serde(default)] + #[configurable(metadata(docs::examples = "127.0.0.1:22"))] + pub endpoint: String, + + /// The user to connect to sftp. + #[serde(default)] + #[configurable(metadata(docs::examples = "ubuntu"))] + pub user: String, + + /// The key path that sftp used to connect to sftp. + #[serde(default)] + #[configurable(metadata(docs::examples = "/path/to/ssh/key/path"))] + pub key: String, + + /// The known_hosts_strategy that sftp used to connect to sftp. + /// + /// Possible value includes: + /// + /// - Strict (default) + /// - Accept + /// - Add + #[serde(default)] + #[configurable(metadata(docs::examples = "strict"))] + pub known_hosts_strategy: String, + + #[serde(flatten)] + pub encoding: EncodingConfigWithFraming, + + #[configurable(derived)] + #[serde(default = "Compression::gzip_default")] + pub compression: Compression, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for SftpConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + root: "/".to_string(), + prefix: "%F/".to_string(), + endpoint: "127.0.0.1:22".to_string(), + + user: "ubuntu".to_string(), + key: "/home/ubuntu/.ssh/id_rsa".to_string(), + known_hosts_strategy: "strict".to_string(), + encoding: ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::default(), + ) + .into(), + compression: Compression::gzip_default(), + batch: BatchConfig::default(), + + acknowledgements: Default::default(), + }) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "sftp")] +impl SinkConfig for SftpConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let op = self.build_operator()?; + + let check_op = op.clone(); + let healthcheck = Box::pin(async move { Ok(check_op.check().await?) }); + + let sink = self.build_processor(op)?; + Ok((sink, healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().1.input_type() & DataType::Log) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl SftpConfig { + pub fn build_operator(&self) -> crate::Result { + // Build OpenDal Operator + let mut builder = Sftp::default(); + // Prefix logic will be handled by key_partitioner. + builder.root(&self.root); + builder.endpoint(&self.endpoint); + builder.user(&self.user); + builder.key(&self.key); + builder.known_hosts_strategy(&self.known_hosts_strategy); + + let op = Operator::new(builder)? + .layer(LoggingLayer::default()) + .finish(); + Ok(op) + } + + pub fn build_processor(&self, op: Operator) -> crate::Result { + // Configure our partitioning/batching. + let batcher_settings = self.batch.into_batcher_settings()?; + + let transformer = self.encoding.transformer(); + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + let encoder = Encoder::::new(framer, serializer); + + let request_builder = OpenDalRequestBuilder { + encoder: (transformer, encoder), + compression: self.compression, + }; + + // TODO: we can add tower middleware here. + let svc = ServiceBuilder::new().service(OpenDalService::new(op)); + + let sink = OpenDalSink::new( + svc, + request_builder, + self.key_partitioner()?, + batcher_settings, + ); + + Ok(VectorSink::from_event_streamsink(sink)) + } + + pub fn key_partitioner(&self) -> crate::Result { + let prefix = self.prefix.clone().try_into()?; + Ok(KeyPartitioner::new(prefix)) + } +} diff --git a/src/sinks/sftp/mod.rs b/src/sinks/sftp/mod.rs new file mode 100644 index 0000000000000..91abdff497b31 --- /dev/null +++ b/src/sinks/sftp/mod.rs @@ -0,0 +1,21 @@ +//! `sftp` sink. +//! +//! `sftp` SFTP, or Secure File Transfer Protocol, is a network protocol used for +//! securely transferring files over the internet. It operates over the Secure +//! Shell (SSH) data stream, providing secure file transfer by both encrypting +//! the data and maintaining the integrity of the transfer. SFTP also supports +//! file management operations like moving and deleting files on the server, unlike FTP. +//! +//! For more information, please refer to: +//! +//! - [sftp(1) — Linux manual page](https://man7.org/linux/man-pages/man1/sftp.1.html) +//! +//! `sftp` is an OpenDal based services. This mod itself only provide config to build an +//! [`crate::sinks::opendal_common::OpenDalSink`]. All real implement are powered by +//! [`crate::sinks::opendal_common::OpenDalSink`]. + +mod config; +pub use config::SftpConfig; + +#[cfg(test)] +mod test; diff --git a/src/sinks/sftp/test.rs b/src/sinks/sftp/test.rs new file mode 100644 index 0000000000000..eb67988b59104 --- /dev/null +++ b/src/sinks/sftp/test.rs @@ -0,0 +1,93 @@ +use bytes::Bytes; +use codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig}; +use vector_common::request_metadata::GroupedCountByteSize; +use vector_core::partition::Partitioner; + +use super::config::SftpConfig; +use crate::{ + codecs::{Encoder, EncodingConfigWithFraming, SinkType}, + event::LogEvent, + sinks::{ + opendal_common::{OpenDalRequest, OpenDalRequestBuilder}, + util::{ + request_builder::{EncodeResult, RequestBuilder}, + Compression, + }, + }, +}; + +fn default_config(encoding: EncodingConfigWithFraming) -> SftpConfig { + SftpConfig { + root: "/tmp/".to_string(), + prefix: "%F/".to_string(), + endpoint: "127.0.0.1:22".to_string(), + user: "ubuntu".to_string(), + key: "/home/ubuntu/.ssh/id_rsa".to_string(), + known_hosts_strategy: "strict".to_string(), + encoding, + compression: Compression::gzip_default(), + batch: Default::default(), + acknowledgements: Default::default(), + } +} + +#[test] +fn Sftp_generate_config() { + crate::test_util::test_generate_config::(); +} + +fn request_builder(sink_config: &SftpConfig) -> OpenDalRequestBuilder { + let transformer = sink_config.encoding.transformer(); + let (framer, serializer) = sink_config + .encoding + .build(SinkType::MessageBased) + .expect("encoding must build with success"); + let encoder = Encoder::::new(framer, serializer); + + OpenDalRequestBuilder { + encoder: (transformer, encoder), + compression: sink_config.compression, + } +} + +fn build_request(compression: Compression) -> OpenDalRequest { + let sink_config = SftpConfig { + compression, + ..default_config( + ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::default(), + ) + .into(), + ) + }; + let log = LogEvent::default().into(); + let key = sink_config + .key_partitioner() + .unwrap() + .partition(&log) + .expect("key wasn't provided"); + let request_builder = request_builder(&sink_config); + let (metadata, metadata_request_builder, _events) = + request_builder.split_input((key, vec![log])); + let byte_size = GroupedCountByteSize::new_untagged(); + let payload = EncodeResult::uncompressed(Bytes::new(), byte_size); + let request_metadata = metadata_request_builder.build(&payload); + + request_builder.build_request(metadata, request_metadata, payload) +} + +#[test] +fn Sftp_build_request() { + let req = build_request(Compression::None); + assert!(req.metadata.partition_key.ends_with(".log")); + + let req = build_request(Compression::None); + assert!(req.metadata.partition_key.ends_with(".log")); + + let req = build_request(Compression::gzip_default()); + assert!(req.metadata.partition_key.ends_with(".log.gz")); + + let req = build_request(Compression::zlib_default()); + assert!(req.metadata.partition_key.ends_with(".log.zz")); +}