From 2a0754e1b831b0329341da86e069bf23d7963744 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Thu, 14 Sep 2023 21:10:52 +0000 Subject: [PATCH 01/17] feat(codecs): add support for protobuf encoding --- lib/codecs/src/decoding/format/mod.rs | 2 +- lib/codecs/src/decoding/format/protobuf.rs | 4 +- lib/codecs/src/decoding/mod.rs | 2 +- lib/codecs/src/encoding/format/mod.rs | 2 + lib/codecs/src/encoding/format/protobuf.rs | 342 ++++++++++++++++++ lib/codecs/src/encoding/mod.rs | 34 +- .../tests/data/encoding/protobuf/test.desc | Bin 0 -> 457 bytes .../tests/data/encoding/protobuf/test.proto | 32 ++ src/codecs/encoding/config.rs | 5 + src/codecs/encoding/encoder.rs | 2 +- src/components/validation/resources/mod.rs | 17 +- src/sinks/websocket/sink.rs | 4 +- .../reference/components/sinks/base/amqp.cue | 26 ++ .../sinks/base/aws_cloudwatch_logs.cue | 26 ++ .../sinks/base/aws_kinesis_firehose.cue | 26 ++ .../sinks/base/aws_kinesis_streams.cue | 26 ++ .../components/sinks/base/aws_s3.cue | 26 ++ .../components/sinks/base/aws_sns.cue | 26 ++ .../components/sinks/base/aws_sqs.cue | 26 ++ .../components/sinks/base/azure_blob.cue | 26 ++ .../components/sinks/base/console.cue | 26 ++ .../reference/components/sinks/base/file.cue | 26 ++ .../sinks/base/gcp_chronicle_unstructured.cue | 26 ++ .../sinks/base/gcp_cloud_storage.cue | 26 ++ .../components/sinks/base/gcp_pubsub.cue | 26 ++ .../reference/components/sinks/base/http.cue | 26 ++ .../components/sinks/base/humio_logs.cue | 26 ++ .../reference/components/sinks/base/kafka.cue | 26 ++ .../reference/components/sinks/base/loki.cue | 26 ++ .../reference/components/sinks/base/nats.cue | 26 ++ .../components/sinks/base/papertrail.cue | 26 ++ .../components/sinks/base/pulsar.cue | 26 ++ .../reference/components/sinks/base/redis.cue | 26 ++ .../components/sinks/base/socket.cue | 26 ++ .../components/sinks/base/splunk_hec_logs.cue | 26 ++ .../components/sinks/base/webhdfs.cue | 26 ++ .../components/sinks/base/websocket.cue | 26 ++ 37 files changed, 1084 insertions(+), 12 deletions(-) create mode 100644 lib/codecs/src/encoding/format/protobuf.rs create mode 100644 lib/codecs/tests/data/encoding/protobuf/test.desc create mode 100644 lib/codecs/tests/data/encoding/protobuf/test.proto diff --git a/lib/codecs/src/decoding/format/mod.rs b/lib/codecs/src/decoding/format/mod.rs index 5172705448dc2..52c4557d7f57f 100644 --- a/lib/codecs/src/decoding/format/mod.rs +++ b/lib/codecs/src/decoding/format/mod.rs @@ -20,7 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig}; pub use native_json::{ NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, }; -pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig}; +pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions}; use smallvec::SmallVec; #[cfg(feature = "syslog")] pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions}; diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 71bd98c1567fa..0c55478189b31 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -73,10 +73,10 @@ impl ProtobufDeserializerConfig { #[derivative(Default)] pub struct ProtobufDeserializerOptions { /// Path to desc file - desc_file: PathBuf, + pub desc_file: PathBuf, /// message type. e.g package.message - message_type: String, + pub message_type: String, } /// Deserializer that builds `Event`s from a byte frame containing protobuf. diff --git a/lib/codecs/src/decoding/mod.rs b/lib/codecs/src/decoding/mod.rs index 008de9d9d245e..94d0f7e2bccc5 100644 --- a/lib/codecs/src/decoding/mod.rs +++ b/lib/codecs/src/decoding/mod.rs @@ -12,7 +12,7 @@ pub use format::{ GelfDeserializerConfig, GelfDeserializerOptions, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions, NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer, - ProtobufDeserializerConfig, + ProtobufDeserializerConfig, ProtobufDeserializerOptions, }; #[cfg(feature = "syslog")] pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions}; diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index efff723f65b46..e61f7cae0bb96 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -11,6 +11,7 @@ mod json; mod logfmt; mod native; mod native_json; +mod protobuf; mod raw_message; mod text; @@ -24,6 +25,7 @@ pub use json::{JsonSerializer, JsonSerializerConfig}; pub use logfmt::{LogfmtSerializer, LogfmtSerializerConfig}; pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; +pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; pub use text::{TextSerializer, TextSerializerConfig}; use vector_core::event::Event; diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs new file mode 100644 index 0000000000000..6cdfcbcfa60a7 --- /dev/null +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -0,0 +1,342 @@ +use crate::encoding::BuildError; +use bytes::BytesMut; +use prost::Message; +use prost_reflect::{DescriptorPool, DynamicMessage, FieldDescriptor, Kind, MessageDescriptor}; +use std::path::{Path, PathBuf}; +use tokio_util::codec::Encoder; +use vector_core::{ + config::DataType, + event::{Event, Value}, + schema, +}; + +fn get_message_descriptor( + descriptor_set_path: &Path, + message_type: &str, +) -> vector_common::Result { + let b = std::fs::read(descriptor_set_path).map_err(|e| { + format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",) + })?; + let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| { + format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}") + })?; + pool.get_message_by_name(message_type).ok_or_else(|| { + format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'") + .into() + }) +} + +/// Config used to build a `ProtobufSerializer`. +#[crate::configurable_component] +#[derive(Debug, Clone)] +pub struct ProtobufSerializerConfig { + /// Options for the Protobuf serializer. + pub protobuf: ProtobufSerializerOptions, +} + +impl ProtobufSerializerConfig { + /// Build the `ProtobufSerializer` from this configuration. + pub fn build(&self) -> Result { + let message_descriptor = get_message_descriptor( + &self.protobuf.descriptor_set_path, + &self.protobuf.message_type, + )?; + Ok(ProtobufSerializer { message_descriptor }) + } + + /// The data type of events that are accepted by `ProtobufSerializer`. + pub fn input_type(&self) -> DataType { + DataType::Log.and(DataType::Trace) + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> schema::Requirement { + // While technically we support `Value` variants that can't be losslessly serialized to + // Protobuf, we don't want to enforce that limitation to users yet. + schema::Requirement::empty() + } +} + +/// Protobuf serializer options. +#[crate::configurable_component] +#[derive(Debug, Clone)] +pub struct ProtobufSerializerOptions { + /// The path to the protobuf descriptor set file. + /// + /// This file is the output of `protoc -o ...` + #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))] + pub descriptor_set_path: PathBuf, + + /// The name of the message type to use for serializing. + #[configurable(metadata(docs::examples = "package.Message"))] + pub message_type: String, +} + +/// Serializer that converts an `Event` to bytes using the Protobuf format. +#[derive(Debug, Clone)] +pub struct ProtobufSerializer { + /// The protobuf message definition to use for serializtion. + message_descriptor: MessageDescriptor, +} + +/// Convert a single raw vector `Value` into a protobuf `Value`. +/// +/// Unlike `convert_value`, this ignores any field metadata such as cardinality. +fn convert_value_raw( + value: Value, + kind: &prost_reflect::Kind, +) -> Result { + let kind_str = value.kind_str().to_owned(); + match (value, kind) { + (Value::Boolean(b), Kind::Bool) => Ok(prost_reflect::Value::Bool(b)), + (Value::Bytes(b), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(b)), + (Value::Bytes(b), Kind::String) => Ok(prost_reflect::Value::String( + String::from_utf8_lossy(&b).into_owned(), + )), + (Value::Float(f), Kind::Double) => Ok(prost_reflect::Value::F64(f.into_inner())), + (Value::Float(f), Kind::Float) => Ok(prost_reflect::Value::F32(f.into_inner() as f32)), + (Value::Integer(i), Kind::Int32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Int64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Sint32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Sint64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Sfixed32) => Ok(prost_reflect::Value::I32(i as i32)), + (Value::Integer(i), Kind::Sfixed64) => Ok(prost_reflect::Value::I64(i)), + (Value::Integer(i), Kind::Uint32) => Ok(prost_reflect::Value::U32(i as u32)), + (Value::Integer(i), Kind::Uint64) => Ok(prost_reflect::Value::U64(i as u64)), + (Value::Integer(i), Kind::Fixed32) => Ok(prost_reflect::Value::U32(i as u32)), + (Value::Integer(i), Kind::Fixed64) => Ok(prost_reflect::Value::U64(i as u64)), + (Value::Object(o), Kind::Message(message_descriptor)) => Ok(prost_reflect::Value::Message( + encode_message(message_descriptor, Value::Object(o))?, + )), + (Value::Regex(r), Kind::String) => Ok(prost_reflect::Value::String(r.as_str().to_owned())), + (Value::Regex(r), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(r.as_bytes())), + (Value::Timestamp(t), Kind::Int64) => Ok(prost_reflect::Value::I64(t.timestamp_micros())), + _ => Err(format!("Cannot encode vector `{kind_str}` into protobuf `{kind:?}`",).into()), + } +} + +/// Convert a vector `Value` into a protobuf `Value`. +fn convert_value( + field_descriptor: &FieldDescriptor, + value: Value, +) -> Result { + if let Value::Array(a) = value { + if field_descriptor.cardinality() == prost_reflect::Cardinality::Repeated { + let repeated: Result, vector_common::Error> = a + .into_iter() + .map(|v| convert_value_raw(v, &field_descriptor.kind())) + .collect(); + Ok(prost_reflect::Value::List(repeated?)) + } else { + Err("Cannot encode vector array into a non-repeated protobuf field".into()) + } + } else { + convert_value_raw(value, &field_descriptor.kind()) + } +} + +fn encode_message( + message_descriptor: &MessageDescriptor, + value: Value, +) -> Result { + let mut message = DynamicMessage::new(message_descriptor.clone()); + if let Value::Object(map) = value { + for field in message_descriptor.fields() { + match map.get(field.name()) { + None | Some(Value::Null) => message.clear_field(&field), + Some(value) => { + message.try_set_field(&field, convert_value(&field, value.clone())?)? + } + } + } + Ok(message) + } else { + Err("ProtobufSerializer only supports serializing objects".into()) + } +} + +impl ProtobufSerializer { + /// Creates a new `ProtobufSerializer`. + pub fn new(message_descriptor: MessageDescriptor) -> Self { + Self { message_descriptor } + } + + /// Get a description of the message type used in serialization. + pub fn descriptor_proto(&self) -> &prost_reflect::prost_types::DescriptorProto { + self.message_descriptor.descriptor_proto() + } +} + +impl Encoder for ProtobufSerializer { + type Error = vector_common::Error; + + fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> { + let message = match event { + Event::Log(log) => encode_message(&self.message_descriptor, log.into_parts().0), + Event::Metric(_) => unimplemented!(), + Event::Trace(trace) => encode_message( + &self.message_descriptor, + Value::Object(trace.into_parts().0), + ), + }?; + message.encode(buffer).map_err(Into::into) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use ordered_float::NotNan; + use std::collections::BTreeMap; + + macro_rules! mfield { + ($m:expr, $f:expr) => { + $m.get_field_by_name($f).unwrap().into_owned() + }; + } + + fn test_message_descriptor(message_type: &str) -> MessageDescriptor { + let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/encoding/protobuf/test.desc"); + get_message_descriptor(&path, &format!("test.{message_type}")).unwrap() + } + + #[test] + fn test_encode_integers() { + let message = encode_message( + &test_message_descriptor("Integers"), + Value::Object(BTreeMap::from([ + ("i32".into(), Value::Integer(-1234)), + ("i64".into(), Value::Integer(-9876)), + ("u32".into(), Value::Integer(1234)), + ("u64".into(), Value::Integer(9876)), + ])), + ) + .unwrap(); + assert!(mfield!(message, "i32").as_i32() == Some(-1234)); + assert!(mfield!(message, "i64").as_i64() == Some(-9876)); + assert!(mfield!(message, "u32").as_u32() == Some(1234)); + assert!(mfield!(message, "u64").as_u64() == Some(9876)); + } + + #[test] + fn test_encode_floats() { + let message = encode_message( + &test_message_descriptor("Floats"), + Value::Object(BTreeMap::from([ + ("d".into(), Value::Float(NotNan::new(11.0).unwrap())), + ("f".into(), Value::Float(NotNan::new(2.0).unwrap())), + ])), + ) + .unwrap(); + assert!(mfield!(message, "d").as_f64() == Some(11.0)); + assert!(mfield!(message, "f").as_f32() == Some(2.0)); + } + + #[test] + fn test_encode_bytes() { + let bytes = Bytes::from(vec![0, 1, 2, 3]); + let message = encode_message( + &test_message_descriptor("Bytes"), + Value::Object(BTreeMap::from([ + ("text".into(), Value::Bytes(Bytes::from("vector"))), + ("binary".into(), Value::Bytes(bytes.clone())), + ])), + ) + .unwrap(); + assert!(mfield!(message, "text").as_str() == Some("vector")); + assert!(mfield!(message, "binary").as_bytes() == Some(&bytes)); + } + + #[test] + fn test_encode_repeated_primitive() { + let message = encode_message( + &test_message_descriptor("RepeatedPrimitive"), + Value::Object(BTreeMap::from([( + "numbers".into(), + Value::Array(vec![ + Value::Integer(8), + Value::Integer(6), + Value::Integer(4), + ]), + )])), + ) + .unwrap(); + let list = mfield!(message, "numbers").as_list().unwrap().to_vec(); + assert!(list.len() == 3); + assert!(list[0].as_i64() == Some(8)); + assert!(list[1].as_i64() == Some(6)); + assert!(list[2].as_i64() == Some(4)); + } + + #[test] + fn test_encode_repeated_message() { + let message = encode_message( + &test_message_descriptor("RepeatedMessage"), + Value::Object(BTreeMap::from([( + "messages".into(), + Value::Array(vec![ + Value::Object(BTreeMap::from([( + "text".into(), + Value::Bytes(Bytes::from("vector")), + )])), + Value::Object(BTreeMap::from([("index".into(), Value::Integer(4444))])), + Value::Object(BTreeMap::from([ + ("text".into(), Value::Bytes(Bytes::from("protobuf"))), + ("index".into(), Value::Integer(1)), + ])), + ]), + )])), + ) + .unwrap(); + let list = mfield!(message, "messages").as_list().unwrap().to_vec(); + assert!(list.len() == 3); + assert!(mfield!(list[0].as_message().unwrap(), "text").as_str() == Some("vector")); + assert!(!list[0].as_message().unwrap().has_field_by_name("index")); + assert!(!list[1].as_message().unwrap().has_field_by_name("t4ext")); + assert!(mfield!(list[1].as_message().unwrap(), "index").as_u32() == Some(4444)); + assert!(mfield!(list[2].as_message().unwrap(), "text").as_str() == Some("protobuf")); + assert!(mfield!(list[2].as_message().unwrap(), "index").as_u32() == Some(1)); + } + + #[test] + fn test_encode_decoding_protobuf_test_data() { + let test_data_dir = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/decoding/protobuf"); + + // test_protobuf (proto2) + let descriptor_set_path = test_data_dir.join("test_protobuf.desc"); + let message_type = "test_protobuf.Person"; + let message_descriptor = + get_message_descriptor(&descriptor_set_path, message_type).unwrap(); + // just check for the side-effect of success + encode_message( + &message_descriptor, + Value::Object(BTreeMap::from([ + ("name".into(), Value::Bytes(Bytes::from("gina"))), + ("id".into(), Value::Integer(9271)), + ])), + ) + .unwrap(); + + // test_protobuf (proto3) + let descriptor_set_path = test_data_dir.join("test_protobuf3.desc"); + let message_type = "test_protobuf3.Person"; + let message_descriptor = + get_message_descriptor(&descriptor_set_path, message_type).unwrap(); + // just check for the side-effect of success + encode_message( + &message_descriptor, + Value::Object(BTreeMap::from([ + ("name".into(), Value::Bytes(Bytes::from("gina"))), + ("id".into(), Value::Integer(9271)), + // TODO: + /*("data".into(), Value::Object(BTreeMap::from([ + ("one".into(), Value::) + ])))*/ + ])), + ) + .unwrap(); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index f9516411720d1..098613c877cfb 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -11,7 +11,8 @@ pub use format::{ AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig, NativeJsonSerializer, - NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, RawMessageSerializer, + NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig, ProtobufSerializer, + ProtobufSerializerConfig, ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer, TextSerializerConfig, }; pub use framing::{ @@ -222,6 +223,11 @@ pub enum SerializerConfig { /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs NativeJson, + /// Encodes an event as a [Protobuf][protobuf] message. + /// + /// [protobuf]: https://protobuf.dev/ + Protobuf(ProtobufSerializerConfig), + /// No encoding. /// /// This encoding uses the `message` field of a log event. @@ -284,6 +290,12 @@ impl From for SerializerConfig { } } +impl From for SerializerConfig { + fn from(config: ProtobufSerializerConfig) -> Self { + Self::Protobuf(config) + } +} + impl From for SerializerConfig { fn from(_: RawMessageSerializerConfig) -> Self { Self::RawMessage @@ -311,6 +323,7 @@ impl SerializerConfig { SerializerConfig::NativeJson => { Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build())) } + SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)), SerializerConfig::RawMessage => { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } @@ -332,9 +345,9 @@ impl SerializerConfig { // we should do so accurately, even if practically it doesn't need to be. // // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing - SerializerConfig::Avro { .. } | SerializerConfig::Native => { - FramingConfig::LengthDelimited - } + SerializerConfig::Avro { .. } + | SerializerConfig::Native + | SerializerConfig::Protobuf(_) => FramingConfig::LengthDelimited, SerializerConfig::Csv(_) | SerializerConfig::Gelf | SerializerConfig::Json(_) @@ -357,6 +370,7 @@ impl SerializerConfig { SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(), SerializerConfig::Native => NativeSerializerConfig.input_type(), SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(), + SerializerConfig::Protobuf(config) => config.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), } @@ -374,6 +388,7 @@ impl SerializerConfig { SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(), SerializerConfig::Native => NativeSerializerConfig.schema_requirement(), SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(), + SerializerConfig::Protobuf(config) => config.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), } @@ -397,6 +412,8 @@ pub enum Serializer { Native(NativeSerializer), /// Uses a `NativeJsonSerializer` for serialization. NativeJson(NativeJsonSerializer), + /// Uses a `ProtobufSerializer` for serialization. + Protobuf(ProtobufSerializer), /// Uses a `RawMessageSerializer` for serialization. RawMessage(RawMessageSerializer), /// Uses a `TextSerializer` for serialization. @@ -413,6 +430,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Protobuf(_) | Serializer::RawMessage(_) => false, } } @@ -433,6 +451,7 @@ impl Serializer { | Serializer::Logfmt(_) | Serializer::Text(_) | Serializer::Native(_) + | Serializer::Protobuf(_) | Serializer::RawMessage(_) => { panic!("Serializer does not support JSON") } @@ -482,6 +501,12 @@ impl From for Serializer { } } +impl From for Serializer { + fn from(serializer: ProtobufSerializer) -> Self { + Self::Protobuf(serializer) + } +} + impl From for Serializer { fn from(serializer: RawMessageSerializer) -> Self { Self::RawMessage(serializer) @@ -506,6 +531,7 @@ impl tokio_util::codec::Encoder for Serializer { Serializer::Logfmt(serializer) => serializer.encode(event, buffer), Serializer::Native(serializer) => serializer.encode(event, buffer), Serializer::NativeJson(serializer) => serializer.encode(event, buffer), + Serializer::Protobuf(serializer) => serializer.encode(event, buffer), Serializer::RawMessage(serializer) => serializer.encode(event, buffer), Serializer::Text(serializer) => serializer.encode(event, buffer), } diff --git a/lib/codecs/tests/data/encoding/protobuf/test.desc b/lib/codecs/tests/data/encoding/protobuf/test.desc new file mode 100644 index 0000000000000000000000000000000000000000..3e1b6fcc085a89f06e91800691d077f4bc3694b6 GIT binary patch literal 457 zcmZ8e%TB{E5X8-+WocFL0Rkr!f#5HZwar+OgrsgF7R&(TU)OC$jpeg{tPsvZ+m7u4aiHjvS-D#W924OJ(F-HkzJ- zr_ymmuE~ACEObhqMIBT;RPxL!EA)7%448#ZL647$k4m1!HxNF5*4${?2O?Y^VMJIq zb6FimW^oEZvBeuG1w3VAobQ-9OwS=KRmGdF^Y__~#Sd`CC0jW0szB?PJxa^s2&CWN&VM@1Kcscl~LAe=f=8M?noe*xy_c$feH literal 0 HcmV?d00001 diff --git a/lib/codecs/tests/data/encoding/protobuf/test.proto b/lib/codecs/tests/data/encoding/protobuf/test.proto new file mode 100644 index 0000000000000..641fa4e83662e --- /dev/null +++ b/lib/codecs/tests/data/encoding/protobuf/test.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package test; + +message Integers { + int32 i32 = 1; + int64 i64 = 2; + uint32 u32 = 3; + uint64 u64 = 4; +} + +message Floats { + double d = 1; + float f = 2; +} + +message Bytes { + string text = 1; + bytes binary = 2; +} + +message RepeatedPrimitive { + repeated int64 numbers = 1; +} + +message RepeatedMessage { + message EmbeddedMessage { + optional string text = 1; + optional uint32 index = 2; + } + repeated EmbeddedMessage messages = 1; +} diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 0ae7b1648a026..c2e6b57106c4f 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -112,6 +112,11 @@ impl EncodingConfigWithFraming { // https://github.com/Graylog2/graylog2-server/issues/1240 CharacterDelimitedEncoder::new(0).into() } + (None, Serializer::Protobuf(_)) => { + // Protobuf uses length-delimited messages, see: + // https://developers.google.com/protocol-buffers/docs/techniques#streaming + LengthDelimitedEncoder::new().into() + } ( None, Serializer::Csv(_) diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index 3f9970fb961a2..f7bfe086e2085 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -113,7 +113,7 @@ impl Encoder { Serializer::Gelf(_) | Serializer::Json(_) | Serializer::NativeJson(_), Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), ) => "application/json", - (Serializer::Native(_), _) => "application/octet-stream", + (Serializer::Native(_), _) | (Serializer::Protobuf(_), _) => "application/octet-stream", ( Serializer::Avro(_) | Serializer::Csv(_) diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 105503bf2f4ea..85ac25f9fad15 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -142,7 +142,14 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S // `message` field... but it's close enough for now. DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()), DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()), - DeserializerConfig::Protobuf(_) => unimplemented!(), + DeserializerConfig::Protobuf(config) => { + SerializerConfig::Protobuf(codecs::encoding::ProtobufSerializerConfig { + protobuf: codecs::encoding::ProtobufSerializerOptions { + descriptor_set_path: config.protobuf.desc_file.clone(), + message_type: config.protobuf.message_type.clone(), + }, + }) + } // TODO: We need to create an Avro serializer because, certainly, for any source decoding // the data as Avro, we can't possibly send anything else without the source just // immediately barfing. @@ -189,6 +196,14 @@ fn serializer_config_to_deserializer( SerializerConfig::Logfmt => todo!(), SerializerConfig::Native => DeserializerConfig::Native, SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()), + SerializerConfig::Protobuf(config) => { + DeserializerConfig::Protobuf(codecs::decoding::ProtobufDeserializerConfig { + protobuf: codecs::decoding::ProtobufDeserializerOptions { + desc_file: config.protobuf.descriptor_set_path.clone(), + message_type: config.protobuf.message_type.clone(), + }, + }) + } SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, }; diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 9cb00d2b53ea9..93590c6c4ba96 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -235,11 +235,11 @@ impl WebSocketSink { const fn should_encode_as_binary(&self) -> bool { use codecs::encoding::Serializer::{ - Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, RawMessage, Text, + Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, }; match self.encoder.serializer() { - RawMessage(_) | Avro(_) | Native(_) => true, + RawMessage(_) | Avro(_) | Native(_) | Protobuf(_) => true, Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, } } diff --git a/website/cue/reference/components/sinks/base/amqp.cue b/website/cue/reference/components/sinks/base/amqp.cue index 213a2e226c245..f5140cdb7cb24 100644 --- a/website/cue/reference/components/sinks/base/amqp.cue +++ b/website/cue/reference/components/sinks/base/amqp.cue @@ -102,6 +102,11 @@ base: components: sinks: amqp: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -236,6 +241,27 @@ base: components: sinks: amqp: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index e18453f47703d..83e03e7c36844 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -273,6 +273,11 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -407,6 +412,27 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index 71bdc26dc6b30..a67ab975eaac0 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -252,6 +252,11 @@ base: components: sinks: aws_kinesis_firehose: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -386,6 +391,27 @@ base: components: sinks: aws_kinesis_firehose: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index 77b8fe21a94f8..97bdc302499e6 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -252,6 +252,11 @@ base: components: sinks: aws_kinesis_streams: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -386,6 +391,27 @@ base: components: sinks: aws_kinesis_streams: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 802bf4111a340..9349b7ec102de 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -361,6 +361,11 @@ base: components: sinks: aws_s3: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -495,6 +500,27 @@ base: components: sinks: aws_s3: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_sns.cue b/website/cue/reference/components/sinks/base/aws_sns.cue index 3cde47292280d..d2868f2eccd18 100644 --- a/website/cue/reference/components/sinks/base/aws_sns.cue +++ b/website/cue/reference/components/sinks/base/aws_sns.cue @@ -188,6 +188,11 @@ base: components: sinks: aws_sns: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -322,6 +327,27 @@ base: components: sinks: aws_sns: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/aws_sqs.cue b/website/cue/reference/components/sinks/base/aws_sqs.cue index 4a9cf238eb0e7..59a7c2da107cd 100644 --- a/website/cue/reference/components/sinks/base/aws_sqs.cue +++ b/website/cue/reference/components/sinks/base/aws_sqs.cue @@ -188,6 +188,11 @@ base: components: sinks: aws_sqs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -322,6 +327,27 @@ base: components: sinks: aws_sqs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/azure_blob.cue b/website/cue/reference/components/sinks/base/azure_blob.cue index df84255dec51e..739b27fede234 100644 --- a/website/cue/reference/components/sinks/base/azure_blob.cue +++ b/website/cue/reference/components/sinks/base/azure_blob.cue @@ -215,6 +215,11 @@ base: components: sinks: azure_blob: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -349,6 +354,27 @@ base: components: sinks: azure_blob: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/console.cue b/website/cue/reference/components/sinks/base/console.cue index 5b22906133e9d..b9c3a1d7d3bd3 100644 --- a/website/cue/reference/components/sinks/base/console.cue +++ b/website/cue/reference/components/sinks/base/console.cue @@ -86,6 +86,11 @@ base: components: sinks: console: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -220,6 +225,27 @@ base: components: sinks: console: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index 0c2696db0c8b2..b4e0e84bab323 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -106,6 +106,11 @@ base: components: sinks: file: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -240,6 +245,27 @@ base: components: sinks: file: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue index d34ce7b9d3187..4bd205e949662 100644 --- a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue +++ b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue @@ -155,6 +155,11 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -289,6 +294,27 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index dc3e43c5aeb8e..eacffe55b35ee 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -239,6 +239,11 @@ base: components: sinks: gcp_cloud_storage: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -373,6 +378,27 @@ base: components: sinks: gcp_cloud_storage: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/gcp_pubsub.cue b/website/cue/reference/components/sinks/base/gcp_pubsub.cue index eae172d862485..f5e8dba4466e8 100644 --- a/website/cue/reference/components/sinks/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/base/gcp_pubsub.cue @@ -153,6 +153,11 @@ base: components: sinks: gcp_pubsub: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -287,6 +292,27 @@ base: components: sinks: gcp_pubsub: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/http.cue b/website/cue/reference/components/sinks/base/http.cue index 1b2d54839ac06..61834d418037c 100644 --- a/website/cue/reference/components/sinks/base/http.cue +++ b/website/cue/reference/components/sinks/base/http.cue @@ -194,6 +194,11 @@ base: components: sinks: http: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -328,6 +333,27 @@ base: components: sinks: http: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/humio_logs.cue b/website/cue/reference/components/sinks/base/humio_logs.cue index fd60d17745e7b..2889cb0b7b09b 100644 --- a/website/cue/reference/components/sinks/base/humio_logs.cue +++ b/website/cue/reference/components/sinks/base/humio_logs.cue @@ -147,6 +147,11 @@ base: components: sinks: humio_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -281,6 +286,27 @@ base: components: sinks: humio_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/kafka.cue b/website/cue/reference/components/sinks/base/kafka.cue index 40486db2f2824..3f370cdde2b17 100644 --- a/website/cue/reference/components/sinks/base/kafka.cue +++ b/website/cue/reference/components/sinks/base/kafka.cue @@ -141,6 +141,11 @@ base: components: sinks: kafka: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -275,6 +280,27 @@ base: components: sinks: kafka: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index bf3aad7a2c162..bf042b88d2f3d 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -198,6 +198,11 @@ base: components: sinks: loki: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -332,6 +337,27 @@ base: components: sinks: loki: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index b6895c9990873..a51c8288eee0e 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -186,6 +186,11 @@ base: components: sinks: nats: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -320,6 +325,27 @@ base: components: sinks: nats: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/papertrail.cue b/website/cue/reference/components/sinks/base/papertrail.cue index 746ce8cf7d5a7..45560f36fc41e 100644 --- a/website/cue/reference/components/sinks/base/papertrail.cue +++ b/website/cue/reference/components/sinks/base/papertrail.cue @@ -86,6 +86,11 @@ base: components: sinks: papertrail: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -220,6 +225,27 @@ base: components: sinks: papertrail: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index c528b90805739..7c1d3c6682452 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -180,6 +180,11 @@ base: components: sinks: pulsar: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -314,6 +319,27 @@ base: components: sinks: pulsar: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/redis.cue b/website/cue/reference/components/sinks/base/redis.cue index eeb31cacf99b4..e69a6a11bb51a 100644 --- a/website/cue/reference/components/sinks/base/redis.cue +++ b/website/cue/reference/components/sinks/base/redis.cue @@ -139,6 +139,11 @@ base: components: sinks: redis: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -273,6 +278,27 @@ base: components: sinks: redis: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/socket.cue b/website/cue/reference/components/sinks/base/socket.cue index bb0da606ac175..840fc0219e0c0 100644 --- a/website/cue/reference/components/sinks/base/socket.cue +++ b/website/cue/reference/components/sinks/base/socket.cue @@ -98,6 +98,11 @@ base: components: sinks: socket: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -232,6 +237,27 @@ base: components: sinks: socket: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue index 1310e15065c15..b37b6cd8a3ec9 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue @@ -197,6 +197,11 @@ base: components: sinks: splunk_hec_logs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -331,6 +336,27 @@ base: components: sinks: splunk_hec_logs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/webhdfs.cue b/website/cue/reference/components/sinks/base/webhdfs.cue index caec116384336..4053a27c7e1a5 100644 --- a/website/cue/reference/components/sinks/base/webhdfs.cue +++ b/website/cue/reference/components/sinks/base/webhdfs.cue @@ -147,6 +147,11 @@ base: components: sinks: webhdfs: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -281,6 +286,27 @@ base: components: sinks: webhdfs: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false diff --git a/website/cue/reference/components/sinks/base/websocket.cue b/website/cue/reference/components/sinks/base/websocket.cue index 8b3770715ceff..be22c2b58961a 100644 --- a/website/cue/reference/components/sinks/base/websocket.cue +++ b/website/cue/reference/components/sinks/base/websocket.cue @@ -133,6 +133,11 @@ base: components: sinks: websocket: configuration: { [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ raw_message: """ No encoding. @@ -267,6 +272,27 @@ base: components: sinks: websocket: configuration: { required: false type: array: items: type: string: {} } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + descriptor_set_path: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } timestamp_format: { description: "Format used for timestamp fields." required: false From 01171dfc702b3d51ef21d4bef1ce2a9a68a0950c Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 18:32:36 +0000 Subject: [PATCH 02/17] rename descriptor_set_path to desc_file --- lib/codecs/src/encoding/format/protobuf.rs | 8 +++----- src/components/validation/resources/mod.rs | 4 ++-- website/cue/reference/components/sinks/base/amqp.cue | 2 +- .../components/sinks/base/aws_cloudwatch_logs.cue | 2 +- .../components/sinks/base/aws_kinesis_firehose.cue | 2 +- .../components/sinks/base/aws_kinesis_streams.cue | 2 +- website/cue/reference/components/sinks/base/aws_s3.cue | 2 +- website/cue/reference/components/sinks/base/aws_sns.cue | 2 +- website/cue/reference/components/sinks/base/aws_sqs.cue | 2 +- .../cue/reference/components/sinks/base/azure_blob.cue | 2 +- website/cue/reference/components/sinks/base/console.cue | 2 +- website/cue/reference/components/sinks/base/file.cue | 2 +- .../components/sinks/base/gcp_chronicle_unstructured.cue | 2 +- .../reference/components/sinks/base/gcp_cloud_storage.cue | 2 +- .../cue/reference/components/sinks/base/gcp_pubsub.cue | 2 +- website/cue/reference/components/sinks/base/http.cue | 2 +- .../cue/reference/components/sinks/base/humio_logs.cue | 2 +- website/cue/reference/components/sinks/base/kafka.cue | 2 +- website/cue/reference/components/sinks/base/loki.cue | 2 +- website/cue/reference/components/sinks/base/nats.cue | 2 +- .../cue/reference/components/sinks/base/papertrail.cue | 2 +- website/cue/reference/components/sinks/base/pulsar.cue | 2 +- website/cue/reference/components/sinks/base/redis.cue | 2 +- website/cue/reference/components/sinks/base/socket.cue | 2 +- .../reference/components/sinks/base/splunk_hec_logs.cue | 2 +- website/cue/reference/components/sinks/base/webhdfs.cue | 2 +- website/cue/reference/components/sinks/base/websocket.cue | 2 +- 27 files changed, 30 insertions(+), 32 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 6cdfcbcfa60a7..d46e4c215a2f7 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -37,10 +37,8 @@ pub struct ProtobufSerializerConfig { impl ProtobufSerializerConfig { /// Build the `ProtobufSerializer` from this configuration. pub fn build(&self) -> Result { - let message_descriptor = get_message_descriptor( - &self.protobuf.descriptor_set_path, - &self.protobuf.message_type, - )?; + let message_descriptor = + get_message_descriptor(&self.protobuf.desc_file, &self.protobuf.message_type)?; Ok(ProtobufSerializer { message_descriptor }) } @@ -65,7 +63,7 @@ pub struct ProtobufSerializerOptions { /// /// This file is the output of `protoc -o ...` #[configurable(metadata(docs::examples = "/etc/vector/protobuf_descriptor_set.desc"))] - pub descriptor_set_path: PathBuf, + pub desc_file: PathBuf, /// The name of the message type to use for serializing. #[configurable(metadata(docs::examples = "package.Message"))] diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 85ac25f9fad15..b66e9832c0f0f 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -145,7 +145,7 @@ fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::S DeserializerConfig::Protobuf(config) => { SerializerConfig::Protobuf(codecs::encoding::ProtobufSerializerConfig { protobuf: codecs::encoding::ProtobufSerializerOptions { - descriptor_set_path: config.protobuf.desc_file.clone(), + desc_file: config.protobuf.desc_file.clone(), message_type: config.protobuf.message_type.clone(), }, }) @@ -199,7 +199,7 @@ fn serializer_config_to_deserializer( SerializerConfig::Protobuf(config) => { DeserializerConfig::Protobuf(codecs::decoding::ProtobufDeserializerConfig { protobuf: codecs::decoding::ProtobufDeserializerOptions { - desc_file: config.protobuf.descriptor_set_path.clone(), + desc_file: config.protobuf.desc_file.clone(), message_type: config.protobuf.message_type.clone(), }, }) diff --git a/website/cue/reference/components/sinks/base/amqp.cue b/website/cue/reference/components/sinks/base/amqp.cue index f5140cdb7cb24..962896680875b 100644 --- a/website/cue/reference/components/sinks/base/amqp.cue +++ b/website/cue/reference/components/sinks/base/amqp.cue @@ -246,7 +246,7 @@ base: components: sinks: amqp: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index 83e03e7c36844..6cdb52d035560 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -417,7 +417,7 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index a67ab975eaac0..7fc315f731801 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -396,7 +396,7 @@ base: components: sinks: aws_kinesis_firehose: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index 97bdc302499e6..96cc7f51915ee 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -396,7 +396,7 @@ base: components: sinks: aws_kinesis_streams: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index 9349b7ec102de..8249d52d82dfc 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -505,7 +505,7 @@ base: components: sinks: aws_s3: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_sns.cue b/website/cue/reference/components/sinks/base/aws_sns.cue index d2868f2eccd18..f3cac4368d5f5 100644 --- a/website/cue/reference/components/sinks/base/aws_sns.cue +++ b/website/cue/reference/components/sinks/base/aws_sns.cue @@ -332,7 +332,7 @@ base: components: sinks: aws_sns: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/aws_sqs.cue b/website/cue/reference/components/sinks/base/aws_sqs.cue index 59a7c2da107cd..98d713016bd2c 100644 --- a/website/cue/reference/components/sinks/base/aws_sqs.cue +++ b/website/cue/reference/components/sinks/base/aws_sqs.cue @@ -332,7 +332,7 @@ base: components: sinks: aws_sqs: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/azure_blob.cue b/website/cue/reference/components/sinks/base/azure_blob.cue index 739b27fede234..4174e6c3af03e 100644 --- a/website/cue/reference/components/sinks/base/azure_blob.cue +++ b/website/cue/reference/components/sinks/base/azure_blob.cue @@ -359,7 +359,7 @@ base: components: sinks: azure_blob: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/console.cue b/website/cue/reference/components/sinks/base/console.cue index b9c3a1d7d3bd3..189ee6bb28fdf 100644 --- a/website/cue/reference/components/sinks/base/console.cue +++ b/website/cue/reference/components/sinks/base/console.cue @@ -230,7 +230,7 @@ base: components: sinks: console: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/file.cue b/website/cue/reference/components/sinks/base/file.cue index b4e0e84bab323..2b6c29120999b 100644 --- a/website/cue/reference/components/sinks/base/file.cue +++ b/website/cue/reference/components/sinks/base/file.cue @@ -250,7 +250,7 @@ base: components: sinks: file: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue index 4bd205e949662..d87bc6f880f19 100644 --- a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue +++ b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue @@ -299,7 +299,7 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index eacffe55b35ee..cb6e46d55b895 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -383,7 +383,7 @@ base: components: sinks: gcp_cloud_storage: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/gcp_pubsub.cue b/website/cue/reference/components/sinks/base/gcp_pubsub.cue index f5e8dba4466e8..8ee2303a1dceb 100644 --- a/website/cue/reference/components/sinks/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/base/gcp_pubsub.cue @@ -297,7 +297,7 @@ base: components: sinks: gcp_pubsub: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/http.cue b/website/cue/reference/components/sinks/base/http.cue index 61834d418037c..7259108caf318 100644 --- a/website/cue/reference/components/sinks/base/http.cue +++ b/website/cue/reference/components/sinks/base/http.cue @@ -338,7 +338,7 @@ base: components: sinks: http: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/humio_logs.cue b/website/cue/reference/components/sinks/base/humio_logs.cue index 2889cb0b7b09b..67976ae1543b0 100644 --- a/website/cue/reference/components/sinks/base/humio_logs.cue +++ b/website/cue/reference/components/sinks/base/humio_logs.cue @@ -291,7 +291,7 @@ base: components: sinks: humio_logs: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/kafka.cue b/website/cue/reference/components/sinks/base/kafka.cue index 3f370cdde2b17..261623b3603d2 100644 --- a/website/cue/reference/components/sinks/base/kafka.cue +++ b/website/cue/reference/components/sinks/base/kafka.cue @@ -285,7 +285,7 @@ base: components: sinks: kafka: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index bf042b88d2f3d..18e59e76a9663 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -342,7 +342,7 @@ base: components: sinks: loki: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index a51c8288eee0e..542a8fab3debd 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -330,7 +330,7 @@ base: components: sinks: nats: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/papertrail.cue b/website/cue/reference/components/sinks/base/papertrail.cue index 45560f36fc41e..a91cf60c7ca85 100644 --- a/website/cue/reference/components/sinks/base/papertrail.cue +++ b/website/cue/reference/components/sinks/base/papertrail.cue @@ -230,7 +230,7 @@ base: components: sinks: papertrail: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/pulsar.cue b/website/cue/reference/components/sinks/base/pulsar.cue index 7c1d3c6682452..03958e1ddd0cb 100644 --- a/website/cue/reference/components/sinks/base/pulsar.cue +++ b/website/cue/reference/components/sinks/base/pulsar.cue @@ -324,7 +324,7 @@ base: components: sinks: pulsar: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/redis.cue b/website/cue/reference/components/sinks/base/redis.cue index e69a6a11bb51a..2a8c157535b68 100644 --- a/website/cue/reference/components/sinks/base/redis.cue +++ b/website/cue/reference/components/sinks/base/redis.cue @@ -283,7 +283,7 @@ base: components: sinks: redis: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/socket.cue b/website/cue/reference/components/sinks/base/socket.cue index 840fc0219e0c0..aee0aa7943c94 100644 --- a/website/cue/reference/components/sinks/base/socket.cue +++ b/website/cue/reference/components/sinks/base/socket.cue @@ -242,7 +242,7 @@ base: components: sinks: socket: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue index b37b6cd8a3ec9..a6e9d0750b5d0 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue @@ -341,7 +341,7 @@ base: components: sinks: splunk_hec_logs: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/webhdfs.cue b/website/cue/reference/components/sinks/base/webhdfs.cue index 4053a27c7e1a5..6bc2589e79566 100644 --- a/website/cue/reference/components/sinks/base/webhdfs.cue +++ b/website/cue/reference/components/sinks/base/webhdfs.cue @@ -291,7 +291,7 @@ base: components: sinks: webhdfs: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. diff --git a/website/cue/reference/components/sinks/base/websocket.cue b/website/cue/reference/components/sinks/base/websocket.cue index be22c2b58961a..3d50b5e55ee15 100644 --- a/website/cue/reference/components/sinks/base/websocket.cue +++ b/website/cue/reference/components/sinks/base/websocket.cue @@ -277,7 +277,7 @@ base: components: sinks: websocket: configuration: { relevant_when: "codec = \"protobuf\"" required: true type: object: options: { - descriptor_set_path: { + desc_file: { description: """ The path to the protobuf descriptor set file. From 924123016832ef253e31fbe766007b7cf29b8b30 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 18:47:45 +0000 Subject: [PATCH 03/17] fix spelling errors --- lib/codecs/src/encoding/format/protobuf.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index d46e4c215a2f7..7da0c54a17860 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -73,7 +73,7 @@ pub struct ProtobufSerializerOptions { /// Serializer that converts an `Event` to bytes using the Protobuf format. #[derive(Debug, Clone)] pub struct ProtobufSerializer { - /// The protobuf message definition to use for serializtion. + /// The protobuf message definition to use for serialization. message_descriptor: MessageDescriptor, } @@ -312,7 +312,7 @@ mod tests { encode_message( &message_descriptor, Value::Object(BTreeMap::from([ - ("name".into(), Value::Bytes(Bytes::from("gina"))), + ("name".into(), Value::Bytes(Bytes::from("rope"))), ("id".into(), Value::Integer(9271)), ])), ) @@ -327,7 +327,7 @@ mod tests { encode_message( &message_descriptor, Value::Object(BTreeMap::from([ - ("name".into(), Value::Bytes(Bytes::from("gina"))), + ("name".into(), Value::Bytes(Bytes::from("rope"))), ("id".into(), Value::Integer(9271)), // TODO: /*("data".into(), Value::Object(BTreeMap::from([ From 6a1c01e6622d98e6dd1efd534fa07433e354c2d4 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 18:50:34 +0000 Subject: [PATCH 04/17] describe encode_message --- lib/codecs/src/encoding/format/protobuf.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 7da0c54a17860..9cf11cd64764a 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -133,6 +133,11 @@ fn convert_value( } } +/// Convert a vector object (`Value`) into a protobuf message. +/// +/// This function can only operate on `Value::Object`s, +/// since they are the only field-based vector Value +/// and protobuf messages are defined as a collection of fields and values. fn encode_message( message_descriptor: &MessageDescriptor, value: Value, From 5a42f89b3b45e4efc652350f9d90763a1fc7ad83 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 18:56:18 +0000 Subject: [PATCH 05/17] factor out common test code --- lib/codecs/src/encoding/format/protobuf.rs | 37 ++++++++-------------- 1 file changed, 13 insertions(+), 24 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 9cf11cd64764a..d31df1271df95 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -303,17 +303,15 @@ mod tests { assert!(mfield!(list[2].as_message().unwrap(), "index").as_u32() == Some(1)); } - #[test] - fn test_encode_decoding_protobuf_test_data() { + fn run_encoding_on_decoding_test_data( + filename: &str, + message_type: &str, + ) -> Result { let test_data_dir = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) .join("tests/data/decoding/protobuf"); - - // test_protobuf (proto2) - let descriptor_set_path = test_data_dir.join("test_protobuf.desc"); - let message_type = "test_protobuf.Person"; + let descriptor_set_path = test_data_dir.join(filename); let message_descriptor = get_message_descriptor(&descriptor_set_path, message_type).unwrap(); - // just check for the side-effect of success encode_message( &message_descriptor, Value::Object(BTreeMap::from([ @@ -321,25 +319,16 @@ mod tests { ("id".into(), Value::Integer(9271)), ])), ) - .unwrap(); + } + + #[test] + fn test_encode_decoding_protobuf_test_data() { + // test_protobuf (proto2) + // just check for the side-effect of success + run_encoding_on_decoding_test_data("test_protobuf.desc", "test_protobuf.Person").unwrap(); // test_protobuf (proto3) - let descriptor_set_path = test_data_dir.join("test_protobuf3.desc"); - let message_type = "test_protobuf3.Person"; - let message_descriptor = - get_message_descriptor(&descriptor_set_path, message_type).unwrap(); // just check for the side-effect of success - encode_message( - &message_descriptor, - Value::Object(BTreeMap::from([ - ("name".into(), Value::Bytes(Bytes::from("rope"))), - ("id".into(), Value::Integer(9271)), - // TODO: - /*("data".into(), Value::Object(BTreeMap::from([ - ("one".into(), Value::) - ])))*/ - ])), - ) - .unwrap(); + run_encoding_on_decoding_test_data("test_protobuf3.desc", "test_protobuf3.Person").unwrap(); } } From d17a683b89d777ae88311b61d08ad5bb551d85d1 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 18:58:58 +0000 Subject: [PATCH 06/17] move protobuf test data to one directory --- lib/codecs/src/decoding/format/protobuf.rs | 2 +- lib/codecs/src/encoding/format/protobuf.rs | 4 ++-- .../data/{decoding => }/protobuf/person_someone.pb | 0 .../data/{decoding => }/protobuf/person_someone.txt | 0 .../data/{decoding => }/protobuf/person_someone3.pb | Bin .../{decoding => }/protobuf/person_someone3.txt | 0 .../tests/data/{encoding => }/protobuf/test.desc | Bin .../tests/data/{encoding => }/protobuf/test.proto | 0 .../data/{decoding => }/protobuf/test_protobuf.desc | Bin .../{decoding => }/protobuf/test_protobuf.proto | 0 .../{decoding => }/protobuf/test_protobuf3.desc | Bin .../{decoding => }/protobuf/test_protobuf3.proto | 0 12 files changed, 3 insertions(+), 3 deletions(-) rename lib/codecs/tests/data/{decoding => }/protobuf/person_someone.pb (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/person_someone.txt (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/person_someone3.pb (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/person_someone3.txt (100%) rename lib/codecs/tests/data/{encoding => }/protobuf/test.desc (100%) rename lib/codecs/tests/data/{encoding => }/protobuf/test.proto (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/test_protobuf.desc (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/test_protobuf.proto (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/test_protobuf3.desc (100%) rename lib/codecs/tests/data/{decoding => }/protobuf/test_protobuf3.proto (100%) diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 0c55478189b31..5d898067fbeac 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -246,7 +246,7 @@ mod tests { fn test_data_dir() -> PathBuf { PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()) - .join("tests/data/decoding/protobuf") + .join("tests/data/protobuf") } fn parse_and_validate( diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index d31df1271df95..db2f009570db9 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -201,7 +201,7 @@ mod tests { fn test_message_descriptor(message_type: &str) -> MessageDescriptor { let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) - .join("tests/data/encoding/protobuf/test.desc"); + .join("tests/data/protobuf/test.desc"); get_message_descriptor(&path, &format!("test.{message_type}")).unwrap() } @@ -308,7 +308,7 @@ mod tests { message_type: &str, ) -> Result { let test_data_dir = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) - .join("tests/data/decoding/protobuf"); + .join("tests/data/protobuf"); let descriptor_set_path = test_data_dir.join(filename); let message_descriptor = get_message_descriptor(&descriptor_set_path, message_type).unwrap(); diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone.pb b/lib/codecs/tests/data/protobuf/person_someone.pb similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone.pb rename to lib/codecs/tests/data/protobuf/person_someone.pb diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone.txt b/lib/codecs/tests/data/protobuf/person_someone.txt similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone.txt rename to lib/codecs/tests/data/protobuf/person_someone.txt diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone3.pb b/lib/codecs/tests/data/protobuf/person_someone3.pb similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone3.pb rename to lib/codecs/tests/data/protobuf/person_someone3.pb diff --git a/lib/codecs/tests/data/decoding/protobuf/person_someone3.txt b/lib/codecs/tests/data/protobuf/person_someone3.txt similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/person_someone3.txt rename to lib/codecs/tests/data/protobuf/person_someone3.txt diff --git a/lib/codecs/tests/data/encoding/protobuf/test.desc b/lib/codecs/tests/data/protobuf/test.desc similarity index 100% rename from lib/codecs/tests/data/encoding/protobuf/test.desc rename to lib/codecs/tests/data/protobuf/test.desc diff --git a/lib/codecs/tests/data/encoding/protobuf/test.proto b/lib/codecs/tests/data/protobuf/test.proto similarity index 100% rename from lib/codecs/tests/data/encoding/protobuf/test.proto rename to lib/codecs/tests/data/protobuf/test.proto diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf.desc b/lib/codecs/tests/data/protobuf/test_protobuf.desc similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf.desc rename to lib/codecs/tests/data/protobuf/test_protobuf.desc diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf.proto b/lib/codecs/tests/data/protobuf/test_protobuf.proto similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf.proto rename to lib/codecs/tests/data/protobuf/test_protobuf.proto diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf3.desc b/lib/codecs/tests/data/protobuf/test_protobuf3.desc similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf3.desc rename to lib/codecs/tests/data/protobuf/test_protobuf3.desc diff --git a/lib/codecs/tests/data/decoding/protobuf/test_protobuf3.proto b/lib/codecs/tests/data/protobuf/test_protobuf3.proto similarity index 100% rename from lib/codecs/tests/data/decoding/protobuf/test_protobuf3.proto rename to lib/codecs/tests/data/protobuf/test_protobuf3.proto From e8758d451a7f26bed6b979a56959a2fe0fb3c5b6 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 20:14:51 +0000 Subject: [PATCH 07/17] create common protobuf module --- lib/codecs/src/common/mod.rs | 3 ++ lib/codecs/src/common/protobuf.rs | 36 +++++++++++++++++++++ lib/codecs/src/decoding/format/protobuf.rs | 37 ++++++---------------- lib/codecs/src/encoding/format/protobuf.rs | 21 ++---------- lib/codecs/src/lib.rs | 1 + 5 files changed, 52 insertions(+), 46 deletions(-) create mode 100644 lib/codecs/src/common/mod.rs create mode 100644 lib/codecs/src/common/protobuf.rs diff --git a/lib/codecs/src/common/mod.rs b/lib/codecs/src/common/mod.rs new file mode 100644 index 0000000000000..230f3b31d2f97 --- /dev/null +++ b/lib/codecs/src/common/mod.rs @@ -0,0 +1,3 @@ +//! A collection of common utility features used by both encoding and decoding logic. + +pub mod protobuf; diff --git a/lib/codecs/src/common/protobuf.rs b/lib/codecs/src/common/protobuf.rs new file mode 100644 index 0000000000000..c50321c211a23 --- /dev/null +++ b/lib/codecs/src/common/protobuf.rs @@ -0,0 +1,36 @@ +use prost_reflect::{DescriptorPool, MessageDescriptor}; +use std::path::Path; + +/// Load a `MessageDescriptor` from a specific message type from the given descriptor set file. +/// +/// The path should point to the output of `protoc -o ...` +pub fn get_message_descriptor( + descriptor_set_path: &Path, + message_type: &str, +) -> vector_common::Result { + let b = std::fs::read(descriptor_set_path).map_err(|e| { + format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",) + })?; + let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| { + format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}") + })?; + pool.get_message_by_name(message_type).ok_or_else(|| { + format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'") + .into() + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + #[test] + fn test_get_message_descriptor() { + let path = PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()) + .join("tests/data/protobuf/test.desc"); + let message_descriptor = get_message_descriptor(&path, "test.Integers").unwrap(); + assert_eq!("Integers", message_descriptor.name()); + assert_eq!(4, message_descriptor.fields().count()); + } +} diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 5d898067fbeac..65be4990b0905 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -1,12 +1,11 @@ use std::collections::BTreeMap; -use std::fs; use std::path::PathBuf; use bytes::Bytes; use chrono::Utc; use derivative::Derivative; use ordered_float::NotNan; -use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage}; +use prost_reflect::{DynamicMessage, MessageDescriptor, ReflectMessage}; use smallvec::{smallvec, SmallVec}; use vector_config::configurable_component; use vector_core::event::LogEvent; @@ -17,6 +16,8 @@ use vector_core::{ }; use vrl::value::Kind; +use crate::common::protobuf::get_message_descriptor; + use super::Deserializer; /// Config used to build a `ProtobufDeserializer`. @@ -90,19 +91,6 @@ impl ProtobufDeserializer { pub fn new(message_descriptor: MessageDescriptor) -> Self { Self { message_descriptor } } - - fn get_message_descriptor( - desc_file: &PathBuf, - message_type: String, - ) -> vector_common::Result { - let b = fs::read(desc_file) - .map_err(|e| format!("Failed to open protobuf desc file '{desc_file:?}': {e}",))?; - let pool = DescriptorPool::decode(b.as_slice()) - .map_err(|e| format!("Failed to parse protobuf desc file '{desc_file:?}': {e}"))?; - Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| { - panic!("The message type '{message_type}' could not be found in '{desc_file:?}'") - })) - } } impl Deserializer for ProtobufDeserializer { @@ -137,10 +125,8 @@ impl Deserializer for ProtobufDeserializer { impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer { type Error = vector_common::Error; fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result { - let message_descriptor = ProtobufDeserializer::get_message_descriptor( - &config.protobuf.desc_file, - config.protobuf.message_type.clone(), - )?; + let message_descriptor = + get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?; Ok(Self::new(message_descriptor)) } } @@ -245,8 +231,7 @@ mod tests { use super::*; fn test_data_dir() -> PathBuf { - PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()) - .join("tests/data/protobuf") + PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf") } fn parse_and_validate( @@ -256,11 +241,7 @@ mod tests { validate_log: fn(&LogEvent), ) { let input = Bytes::from(protobuf_bin_message); - let message_descriptor = ProtobufDeserializer::get_message_descriptor( - &protobuf_desc_path, - message_type.to_string(), - ) - .unwrap(); + let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap(); let deserializer = ProtobufDeserializer::new(message_descriptor); for namespace in [LogNamespace::Legacy, LogNamespace::Vector] { @@ -352,9 +333,9 @@ mod tests { #[test] fn deserialize_error_invalid_protobuf() { let input = Bytes::from("{ foo"); - let message_descriptor = ProtobufDeserializer::get_message_descriptor( + let message_descriptor = get_message_descriptor( &test_data_dir().join("test_protobuf.desc"), - "test_protobuf.Person".to_string(), + "test_protobuf.Person", ) .unwrap(); let deserializer = ProtobufDeserializer::new(message_descriptor); diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index db2f009570db9..6f58a6d92b1c0 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -1,8 +1,9 @@ +use crate::common::protobuf::get_message_descriptor; use crate::encoding::BuildError; use bytes::BytesMut; use prost::Message; -use prost_reflect::{DescriptorPool, DynamicMessage, FieldDescriptor, Kind, MessageDescriptor}; -use std::path::{Path, PathBuf}; +use prost_reflect::{DynamicMessage, FieldDescriptor, Kind, MessageDescriptor}; +use std::path::PathBuf; use tokio_util::codec::Encoder; use vector_core::{ config::DataType, @@ -10,22 +11,6 @@ use vector_core::{ schema, }; -fn get_message_descriptor( - descriptor_set_path: &Path, - message_type: &str, -) -> vector_common::Result { - let b = std::fs::read(descriptor_set_path).map_err(|e| { - format!("Failed to open protobuf desc file '{descriptor_set_path:?}': {e}",) - })?; - let pool = DescriptorPool::decode(b.as_slice()).map_err(|e| { - format!("Failed to parse protobuf desc file '{descriptor_set_path:?}': {e}") - })?; - pool.get_message_by_name(message_type).ok_or_else(|| { - format!("The message type '{message_type}' could not be found in '{descriptor_set_path:?}'") - .into() - }) -} - /// Config used to build a `ProtobufSerializer`. #[crate::configurable_component] #[derive(Debug, Clone)] diff --git a/lib/codecs/src/lib.rs b/lib/codecs/src/lib.rs index 0aa2c63fe1952..07adc18df67b3 100644 --- a/lib/codecs/src/lib.rs +++ b/lib/codecs/src/lib.rs @@ -4,6 +4,7 @@ #![deny(missing_docs)] #![deny(warnings)] +mod common; pub mod decoding; pub mod encoding; pub mod gelf; From 7be964b3cef85490882cf50827631227e83d086c Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 20 Sep 2023 23:58:24 +0000 Subject: [PATCH 08/17] add map encoding and test --- lib/codecs/src/encoding/format/protobuf.rs | 98 ++++++++++++++++++++-- lib/codecs/tests/data/protobuf/test.proto | 9 ++ 2 files changed, 102 insertions(+), 5 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 6f58a6d92b1c0..fd8c710d91dbb 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -2,7 +2,8 @@ use crate::common::protobuf::get_message_descriptor; use crate::encoding::BuildError; use bytes::BytesMut; use prost::Message; -use prost_reflect::{DynamicMessage, FieldDescriptor, Kind, MessageDescriptor}; +use prost_reflect::{DynamicMessage, FieldDescriptor, Kind, MapKey, MessageDescriptor}; +use std::collections::HashMap; use std::path::PathBuf; use tokio_util::codec::Encoder; use vector_core::{ @@ -70,6 +71,10 @@ fn convert_value_raw( kind: &prost_reflect::Kind, ) -> Result { let kind_str = value.kind_str().to_owned(); + eprintln!("{:?}", kind); + if let prost_reflect::Kind::Message(x) = kind { + eprintln!("{:?}", x); + } match (value, kind) { (Value::Boolean(b), Kind::Bool) => Ok(prost_reflect::Value::Bool(b)), (Value::Bytes(b), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(b)), @@ -88,9 +93,29 @@ fn convert_value_raw( (Value::Integer(i), Kind::Uint64) => Ok(prost_reflect::Value::U64(i as u64)), (Value::Integer(i), Kind::Fixed32) => Ok(prost_reflect::Value::U32(i as u32)), (Value::Integer(i), Kind::Fixed64) => Ok(prost_reflect::Value::U64(i as u64)), - (Value::Object(o), Kind::Message(message_descriptor)) => Ok(prost_reflect::Value::Message( - encode_message(message_descriptor, Value::Object(o))?, - )), + (Value::Object(o), Kind::Message(message_descriptor)) => { + if message_descriptor.is_map_entry() { + let value_field = message_descriptor + .get_field_by_name("value") + .ok_or_else(|| "Internal error with proto map processing")?; + let mut map: HashMap = HashMap::new(); + for (key, val) in o.into_iter() { + match convert_value(&value_field, val) { + Ok(prost_val) => { + map.insert(MapKey::String(key), prost_val); + } + Err(e) => return Err(e), + } + } + Ok(prost_reflect::Value::Map(map)) + } else { + // if it's not a map, it's an actual message + Ok(prost_reflect::Value::Message(encode_message( + message_descriptor, + Value::Object(o), + )?)) + } + } (Value::Regex(r), Kind::String) => Ok(prost_reflect::Value::String(r.as_str().to_owned())), (Value::Regex(r), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(r.as_bytes())), (Value::Timestamp(t), Kind::Int64) => Ok(prost_reflect::Value::I64(t.timestamp_micros())), @@ -129,6 +154,11 @@ fn encode_message( ) -> Result { let mut message = DynamicMessage::new(message_descriptor.clone()); if let Value::Object(map) = value { + eprintln!( + "{:?} {:?}", + message_descriptor.is_map_entry(), + message_descriptor.fields().next() + ); for field in message_descriptor.fields() { match map.get(field.name()) { None | Some(Value::Null) => message.clear_field(&field), @@ -176,7 +206,8 @@ mod tests { use super::*; use bytes::Bytes; use ordered_float::NotNan; - use std::collections::BTreeMap; + use prost_reflect::MapKey; + use std::collections::{BTreeMap, HashMap}; macro_rules! mfield { ($m:expr, $f:expr) => { @@ -237,6 +268,63 @@ mod tests { assert!(mfield!(message, "binary").as_bytes() == Some(&bytes)); } + #[test] + fn test_encode_map() { + let message = encode_message( + &test_message_descriptor("Map"), + Value::Object(BTreeMap::from([ + ( + "names".into(), + Value::Object(BTreeMap::from([ + ("forty-four".into(), Value::Integer(44)), + ("one".into(), Value::Integer(1)), + ])), + ), + ( + "people".into(), + Value::Object(BTreeMap::from([( + "mark".into(), + Value::Object(BTreeMap::from([ + ("nickname".into(), Value::Bytes(Bytes::from("jeff"))), + ("age".into(), Value::Integer(22)), + ])), + )])), + ), + ])), + ) + .unwrap(); + // the simpler string->primative map + assert_eq!( + Some(&HashMap::from([ + ( + MapKey::String("forty-four".into()), + prost_reflect::Value::I32(44), + ), + (MapKey::String("one".into()), prost_reflect::Value::I32(1),), + ])), + mfield!(message, "names").as_map() + ); + // the not-simpler string->message map + let people = mfield!(message, "people").as_map().unwrap().to_owned(); + assert_eq!(1, people.len()); + assert_eq!( + Some("jeff"), + mfield!( + people[&MapKey::String("mark".into())].as_message().unwrap(), + "nickname" + ) + .as_str() + ); + assert_eq!( + Some(22), + mfield!( + people[&MapKey::String("mark".into())].as_message().unwrap(), + "age" + ) + .as_u32() + ); + } + #[test] fn test_encode_repeated_primitive() { let message = encode_message( diff --git a/lib/codecs/tests/data/protobuf/test.proto b/lib/codecs/tests/data/protobuf/test.proto index 641fa4e83662e..882f075c9170f 100644 --- a/lib/codecs/tests/data/protobuf/test.proto +++ b/lib/codecs/tests/data/protobuf/test.proto @@ -19,6 +19,15 @@ message Bytes { bytes binary = 2; } +message Map { + message Person { + string nickname = 1; + uint32 age = 2; + }; + map names = 1; + map people = 2; +} + message RepeatedPrimitive { repeated int64 numbers = 1; } From af5a4e976fa52914ed3f7ace362408413711a582 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Thu, 21 Sep 2023 00:13:23 +0000 Subject: [PATCH 09/17] add enum encoding and test --- lib/codecs/src/encoding/format/protobuf.rs | 48 +++++++++++++++++---- lib/codecs/tests/data/protobuf/test.desc | Bin 457 -> 920 bytes lib/codecs/tests/data/protobuf/test.proto | 11 +++++ 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index fd8c710d91dbb..a6f3b60b236fa 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -71,16 +71,34 @@ fn convert_value_raw( kind: &prost_reflect::Kind, ) -> Result { let kind_str = value.kind_str().to_owned(); - eprintln!("{:?}", kind); - if let prost_reflect::Kind::Message(x) = kind { - eprintln!("{:?}", x); - } match (value, kind) { (Value::Boolean(b), Kind::Bool) => Ok(prost_reflect::Value::Bool(b)), (Value::Bytes(b), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(b)), (Value::Bytes(b), Kind::String) => Ok(prost_reflect::Value::String( String::from_utf8_lossy(&b).into_owned(), )), + (Value::Bytes(b), Kind::Enum(descriptor)) => { + let string = String::from_utf8_lossy(&b).into_owned(); + // check for an exact enum name match + if let Some(d) = descriptor.values().filter(|v| v.name() == &string).next() { + return Ok(prost_reflect::Value::EnumNumber(d.number())); + } + // check for an enum name match while ignoring capitlization + if let Some(d) = descriptor + .values() + .filter(|v| v.name().eq_ignore_ascii_case(&string)) + .next() + { + return Ok(prost_reflect::Value::EnumNumber(d.number())); + } + // give up + Err(format!( + "Enum `{}` has no value that matches string '{}'", + descriptor.full_name(), + string + ) + .into()) + } (Value::Float(f), Kind::Double) => Ok(prost_reflect::Value::F64(f.into_inner())), (Value::Float(f), Kind::Float) => Ok(prost_reflect::Value::F32(f.into_inner() as f32)), (Value::Integer(i), Kind::Int32) => Ok(prost_reflect::Value::I32(i as i32)), @@ -93,6 +111,7 @@ fn convert_value_raw( (Value::Integer(i), Kind::Uint64) => Ok(prost_reflect::Value::U64(i as u64)), (Value::Integer(i), Kind::Fixed32) => Ok(prost_reflect::Value::U32(i as u32)), (Value::Integer(i), Kind::Fixed64) => Ok(prost_reflect::Value::U64(i as u64)), + (Value::Integer(i), Kind::Enum(_)) => Ok(prost_reflect::Value::EnumNumber(i as i32)), (Value::Object(o), Kind::Message(message_descriptor)) => { if message_descriptor.is_map_entry() { let value_field = message_descriptor @@ -154,11 +173,6 @@ fn encode_message( ) -> Result { let mut message = DynamicMessage::new(message_descriptor.clone()); if let Value::Object(map) = value { - eprintln!( - "{:?} {:?}", - message_descriptor.is_map_entry(), - message_descriptor.fields().next() - ); for field in message_descriptor.fields() { match map.get(field.name()) { None | Some(Value::Null) => message.clear_field(&field), @@ -325,6 +339,22 @@ mod tests { ); } + #[test] + fn test_encode_enum() { + let message = encode_message( + &test_message_descriptor("Enum"), + Value::Object(BTreeMap::from([ + ("breakfast".into(), Value::Bytes(Bytes::from("tomato"))), + ("dinner".into(), Value::Bytes(Bytes::from("OLIVE"))), + ("lunch".into(), Value::Integer(0)), + ])), + ) + .unwrap(); + assert_eq!(Some(2), mfield!(message, "breakfast").as_enum_number()); + assert_eq!(Some(0), mfield!(message, "lunch").as_enum_number()); + assert_eq!(Some(1), mfield!(message, "dinner").as_enum_number()); + } + #[test] fn test_encode_repeated_primitive() { let message = encode_message( diff --git a/lib/codecs/tests/data/protobuf/test.desc b/lib/codecs/tests/data/protobuf/test.desc index 3e1b6fcc085a89f06e91800691d077f4bc3694b6..4544bb337a07bed9bb2346c27b72aa8f85a1412b 100644 GIT binary patch delta 501 zcmX@fJcFH|YbraJTuEwiNwI!PVoBmeiBt9SnYfsJ6AOg2xLETNb5n~Y7!{Z`xQ#^g zK+5!hGJ1X>Vb{EpqRJqM5+PkKwu03Bf}B(dCa_{rgyI0OI7Bs6k(3!1TR>`2aekhV z6c*q`}3Ol9`v6S|q^? z^gcF~P^HLLY!QzjsXEat^y1oroWG8n5zJz5EmC)h`+C6h`#{S#D^~cO)rTo delta 36 scmbQiev(;&>libadP!<=NwI!PVo9QYYF=`FN@iaAME+BgH!?j30N&XRkpKVy diff --git a/lib/codecs/tests/data/protobuf/test.proto b/lib/codecs/tests/data/protobuf/test.proto index 882f075c9170f..06fc7e4d384e4 100644 --- a/lib/codecs/tests/data/protobuf/test.proto +++ b/lib/codecs/tests/data/protobuf/test.proto @@ -28,6 +28,17 @@ message Map { map people = 2; } +message Enum { + enum Fruit { + APPLE = 0; + OLIVE = 1; + TOMATO = 2; + } + Fruit breakfast = 1; + Fruit lunch = 2; + Fruit dinner = 3; +} + message RepeatedPrimitive { repeated int64 numbers = 1; } From 1d3a498c958b0724a4832f1d263aea86d72a9392 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Thu, 21 Sep 2023 17:08:11 +0000 Subject: [PATCH 10/17] add timestamp encoding and test --- lib/codecs/src/encoding/format/protobuf.rs | 29 +++++++++++++++++++++ lib/codecs/tests/data/protobuf/test.desc | Bin 920 -> 1258 bytes lib/codecs/tests/data/protobuf/test.proto | 9 +++++++ 3 files changed, 38 insertions(+) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index a6f3b60b236fa..bf335c56a3932 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -1,6 +1,7 @@ use crate::common::protobuf::get_message_descriptor; use crate::encoding::BuildError; use bytes::BytesMut; +use chrono::Timelike; use prost::Message; use prost_reflect::{DynamicMessage, FieldDescriptor, Kind, MapKey, MessageDescriptor}; use std::collections::HashMap; @@ -138,6 +139,15 @@ fn convert_value_raw( (Value::Regex(r), Kind::String) => Ok(prost_reflect::Value::String(r.as_str().to_owned())), (Value::Regex(r), Kind::Bytes) => Ok(prost_reflect::Value::Bytes(r.as_bytes())), (Value::Timestamp(t), Kind::Int64) => Ok(prost_reflect::Value::I64(t.timestamp_micros())), + (Value::Timestamp(t), Kind::Message(descriptor)) + if descriptor.full_name() == "google.protobuf.Timestamp" => + { + let mut message = DynamicMessage::new(descriptor.clone()); + message.try_set_field_by_name("seconds", prost_reflect::Value::I64(t.timestamp()))?; + message + .try_set_field_by_name("nanos", prost_reflect::Value::I32(t.nanosecond() as i32))?; + Ok(prost_reflect::Value::Message(message)) + } _ => Err(format!("Cannot encode vector `{kind_str}` into protobuf `{kind:?}`",).into()), } } @@ -219,6 +229,7 @@ impl Encoder for ProtobufSerializer { mod tests { use super::*; use bytes::Bytes; + use chrono::{DateTime, NaiveDateTime, Utc}; use ordered_float::NotNan; use prost_reflect::MapKey; use std::collections::{BTreeMap, HashMap}; @@ -355,6 +366,24 @@ mod tests { assert_eq!(Some(1), mfield!(message, "dinner").as_enum_number()); } + #[test] + fn test_encode_timestamp() { + let message = encode_message( + &test_message_descriptor("Timestamp"), + Value::Object(BTreeMap::from([( + "morning".into(), + Value::Timestamp(DateTime::from_naive_utc_and_offset( + NaiveDateTime::from_timestamp_opt(8675, 309).unwrap(), + Utc, + )), + )])), + ) + .unwrap(); + let timestamp = mfield!(message, "morning").as_message().unwrap().clone(); + assert_eq!(Some(8675), mfield!(timestamp, "seconds").as_i64()); + assert_eq!(Some(309), mfield!(timestamp, "nanos").as_i32()); + } + #[test] fn test_encode_repeated_primitive() { let message = encode_message( diff --git a/lib/codecs/tests/data/protobuf/test.desc b/lib/codecs/tests/data/protobuf/test.desc index 4544bb337a07bed9bb2346c27b72aa8f85a1412b..f12bfa7d889b8237c0cbb34d81ce79b716907f2a 100644 GIT binary patch delta 394 zcmbQi{))4n>pvrxe0qL+oZh;~!8 zoK(1}PJD1P13)ebV2mVb}&t5W_b<( D6G{_A diff --git a/lib/codecs/tests/data/protobuf/test.proto b/lib/codecs/tests/data/protobuf/test.proto index 06fc7e4d384e4..8e3275b7e5394 100644 --- a/lib/codecs/tests/data/protobuf/test.proto +++ b/lib/codecs/tests/data/protobuf/test.proto @@ -1,7 +1,12 @@ +// Remember to recompile `test.desc` when you update this file: +// protoc -I . -o test.desc test.proto google/protobuf/timestamp.proto + syntax = "proto3"; package test; +import "google/protobuf/timestamp.proto"; + message Integers { int32 i32 = 1; int64 i64 = 2; @@ -39,6 +44,10 @@ message Enum { Fruit dinner = 3; } +message Timestamp { + google.protobuf.Timestamp morning = 1; +} + message RepeatedPrimitive { repeated int64 numbers = 1; } From 82aac3c02bf6f0d6c4939df86d9ee96bd769115d Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Thu, 21 Sep 2023 17:49:27 +0000 Subject: [PATCH 11/17] fix spelling again --- lib/codecs/src/encoding/format/protobuf.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index bf335c56a3932..e0919a63adfbd 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -84,7 +84,7 @@ fn convert_value_raw( if let Some(d) = descriptor.values().filter(|v| v.name() == &string).next() { return Ok(prost_reflect::Value::EnumNumber(d.number())); } - // check for an enum name match while ignoring capitlization + // check for an enum name match while ignoring capitalization if let Some(d) = descriptor .values() .filter(|v| v.name().eq_ignore_ascii_case(&string)) @@ -318,7 +318,7 @@ mod tests { ])), ) .unwrap(); - // the simpler string->primative map + // the simpler string->primitive map assert_eq!( Some(&HashMap::from([ ( From 473177c1a12465edf1d3ec5f0794e8e552f57895 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Tue, 26 Sep 2023 22:39:39 +0000 Subject: [PATCH 12/17] assert -> assert_eq --- lib/codecs/src/encoding/format/protobuf.rs | 46 ++++++++++++++-------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index e0919a63adfbd..32fd7d47262b3 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -258,10 +258,10 @@ mod tests { ])), ) .unwrap(); - assert!(mfield!(message, "i32").as_i32() == Some(-1234)); - assert!(mfield!(message, "i64").as_i64() == Some(-9876)); - assert!(mfield!(message, "u32").as_u32() == Some(1234)); - assert!(mfield!(message, "u64").as_u64() == Some(9876)); + assert_eq!(Some(-1234), mfield!(message, "i32").as_i32()); + assert_eq!(Some(-9876), mfield!(message, "i64").as_i64()); + assert_eq!(Some(1234), mfield!(message, "u32").as_u32()); + assert_eq!(Some(9876), mfield!(message, "u64").as_u64()); } #[test] @@ -274,8 +274,8 @@ mod tests { ])), ) .unwrap(); - assert!(mfield!(message, "d").as_f64() == Some(11.0)); - assert!(mfield!(message, "f").as_f32() == Some(2.0)); + assert_eq!(Some(11.0), mfield!(message, "d").as_f64()); + assert_eq!(Some(2.0), mfield!(message, "f").as_f32()); } #[test] @@ -289,8 +289,8 @@ mod tests { ])), ) .unwrap(); - assert!(mfield!(message, "text").as_str() == Some("vector")); - assert!(mfield!(message, "binary").as_bytes() == Some(&bytes)); + assert_eq!(Some("vector"), mfield!(message, "text").as_str()); + assert_eq!(Some(&bytes), mfield!(message, "binary").as_bytes()); } #[test] @@ -399,10 +399,10 @@ mod tests { ) .unwrap(); let list = mfield!(message, "numbers").as_list().unwrap().to_vec(); - assert!(list.len() == 3); - assert!(list[0].as_i64() == Some(8)); - assert!(list[1].as_i64() == Some(6)); - assert!(list[2].as_i64() == Some(4)); + assert_eq!(3, list.len()); + assert_eq!(Some(8), list[0].as_i64()); + assert_eq!(Some(6), list[1].as_i64()); + assert_eq!(Some(4), list[2].as_i64()); } #[test] @@ -426,13 +426,25 @@ mod tests { ) .unwrap(); let list = mfield!(message, "messages").as_list().unwrap().to_vec(); - assert!(list.len() == 3); - assert!(mfield!(list[0].as_message().unwrap(), "text").as_str() == Some("vector")); + assert_eq!(3, list.len()); + assert_eq!( + Some("vector"), + mfield!(list[0].as_message().unwrap(), "text").as_str() + ); assert!(!list[0].as_message().unwrap().has_field_by_name("index")); assert!(!list[1].as_message().unwrap().has_field_by_name("t4ext")); - assert!(mfield!(list[1].as_message().unwrap(), "index").as_u32() == Some(4444)); - assert!(mfield!(list[2].as_message().unwrap(), "text").as_str() == Some("protobuf")); - assert!(mfield!(list[2].as_message().unwrap(), "index").as_u32() == Some(1)); + assert_eq!( + Some(4444), + mfield!(list[1].as_message().unwrap(), "index").as_u32() + ); + assert_eq!( + Some("protobuf"), + mfield!(list[2].as_message().unwrap(), "text").as_str() + ); + assert_eq!( + Some(1), + mfield!(list[2].as_message().unwrap(), "index").as_u32() + ); } fn run_encoding_on_decoding_test_data( From d16507e6bad928ed53af9d6a243a62e572c5c7b3 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Tue, 26 Sep 2023 22:40:19 +0000 Subject: [PATCH 13/17] remove simple comments --- lib/codecs/src/encoding/format/protobuf.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 32fd7d47262b3..ef73c24fa4d6a 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -467,12 +467,8 @@ mod tests { #[test] fn test_encode_decoding_protobuf_test_data() { - // test_protobuf (proto2) // just check for the side-effect of success run_encoding_on_decoding_test_data("test_protobuf.desc", "test_protobuf.Person").unwrap(); - - // test_protobuf (proto3) - // just check for the side-effect of success run_encoding_on_decoding_test_data("test_protobuf3.desc", "test_protobuf3.Person").unwrap(); } } From 3759e3430b1995855aa72d0853397d7e6304113b Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Tue, 26 Sep 2023 22:42:14 +0000 Subject: [PATCH 14/17] enums are not case-sensitive --- lib/codecs/src/encoding/format/protobuf.rs | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index ef73c24fa4d6a..751d8161f3c0d 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -80,25 +80,20 @@ fn convert_value_raw( )), (Value::Bytes(b), Kind::Enum(descriptor)) => { let string = String::from_utf8_lossy(&b).into_owned(); - // check for an exact enum name match - if let Some(d) = descriptor.values().filter(|v| v.name() == &string).next() { - return Ok(prost_reflect::Value::EnumNumber(d.number())); - } - // check for an enum name match while ignoring capitalization if let Some(d) = descriptor .values() .filter(|v| v.name().eq_ignore_ascii_case(&string)) .next() { return Ok(prost_reflect::Value::EnumNumber(d.number())); + } else { + Err(format!( + "Enum `{}` has no value that matches string '{}'", + descriptor.full_name(), + string + ) + .into()) } - // give up - Err(format!( - "Enum `{}` has no value that matches string '{}'", - descriptor.full_name(), - string - ) - .into()) } (Value::Float(f), Kind::Double) => Ok(prost_reflect::Value::F64(f.into_inner())), (Value::Float(f), Kind::Float) => Ok(prost_reflect::Value::F32(f.into_inner() as f32)), From 331542fa0c42cf31a11a7c875d51f3064b7e3bbd Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Tue, 26 Sep 2023 23:26:44 +0000 Subject: [PATCH 15/17] add round trip test --- .../protobuf/google/protobuf/timestamp.proto | 147 ++++++++++++++++++ lib/codecs/tests/protobuf.rs | 67 ++++++++ 2 files changed, 214 insertions(+) create mode 100644 lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto create mode 100644 lib/codecs/tests/protobuf.rs diff --git a/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto b/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto new file mode 100644 index 0000000000000..3b2df6d91168e --- /dev/null +++ b/lib/codecs/tests/data/protobuf/google/protobuf/timestamp.proto @@ -0,0 +1,147 @@ +// Protocol Buffers - Google's data interchange format +// Copyright 2008 Google Inc. All rights reserved. +// https://developers.google.com/protocol-buffers/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package google.protobuf; + +option csharp_namespace = "Google.Protobuf.WellKnownTypes"; +option cc_enable_arenas = true; +option go_package = "google.golang.org/protobuf/types/known/timestamppb"; +option java_package = "com.google.protobuf"; +option java_outer_classname = "TimestampProto"; +option java_multiple_files = true; +option objc_class_prefix = "GPB"; + +// A Timestamp represents a point in time independent of any time zone or local +// calendar, encoded as a count of seconds and fractions of seconds at +// nanosecond resolution. The count is relative to an epoch at UTC midnight on +// January 1, 1970, in the proleptic Gregorian calendar which extends the +// Gregorian calendar backwards to year one. +// +// All minutes are 60 seconds long. Leap seconds are "smeared" so that no leap +// second table is needed for interpretation, using a [24-hour linear +// smear](https://developers.google.com/time/smear). +// +// The range is from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59.999999999Z. By +// restricting to that range, we ensure that we can convert to and from [RFC +// 3339](https://www.ietf.org/rfc/rfc3339.txt) date strings. +// +// # Examples +// +// Example 1: Compute Timestamp from POSIX `time()`. +// +// Timestamp timestamp; +// timestamp.set_seconds(time(NULL)); +// timestamp.set_nanos(0); +// +// Example 2: Compute Timestamp from POSIX `gettimeofday()`. +// +// struct timeval tv; +// gettimeofday(&tv, NULL); +// +// Timestamp timestamp; +// timestamp.set_seconds(tv.tv_sec); +// timestamp.set_nanos(tv.tv_usec * 1000); +// +// Example 3: Compute Timestamp from Win32 `GetSystemTimeAsFileTime()`. +// +// FILETIME ft; +// GetSystemTimeAsFileTime(&ft); +// UINT64 ticks = (((UINT64)ft.dwHighDateTime) << 32) | ft.dwLowDateTime; +// +// // A Windows tick is 100 nanoseconds. Windows epoch 1601-01-01T00:00:00Z +// // is 11644473600 seconds before Unix epoch 1970-01-01T00:00:00Z. +// Timestamp timestamp; +// timestamp.set_seconds((INT64) ((ticks / 10000000) - 11644473600LL)); +// timestamp.set_nanos((INT32) ((ticks % 10000000) * 100)); +// +// Example 4: Compute Timestamp from Java `System.currentTimeMillis()`. +// +// long millis = System.currentTimeMillis(); +// +// Timestamp timestamp = Timestamp.newBuilder().setSeconds(millis / 1000) +// .setNanos((int) ((millis % 1000) * 1000000)).build(); +// +// +// Example 5: Compute Timestamp from Java `Instant.now()`. +// +// Instant now = Instant.now(); +// +// Timestamp timestamp = +// Timestamp.newBuilder().setSeconds(now.getEpochSecond()) +// .setNanos(now.getNano()).build(); +// +// +// Example 6: Compute Timestamp from current time in Python. +// +// timestamp = Timestamp() +// timestamp.GetCurrentTime() +// +// # JSON Mapping +// +// In JSON format, the Timestamp type is encoded as a string in the +// [RFC 3339](https://www.ietf.org/rfc/rfc3339.txt) format. That is, the +// format is "{year}-{month}-{day}T{hour}:{min}:{sec}[.{frac_sec}]Z" +// where {year} is always expressed using four digits while {month}, {day}, +// {hour}, {min}, and {sec} are zero-padded to two digits each. The fractional +// seconds, which can go up to 9 digits (i.e. up to 1 nanosecond resolution), +// are optional. The "Z" suffix indicates the timezone ("UTC"); the timezone +// is required. A proto3 JSON serializer should always use UTC (as indicated by +// "Z") when printing the Timestamp type and a proto3 JSON parser should be +// able to accept both UTC and other timezones (as indicated by an offset). +// +// For example, "2017-01-15T01:30:15.01Z" encodes 15.01 seconds past +// 01:30 UTC on January 15, 2017. +// +// In JavaScript, one can convert a Date object to this format using the +// standard +// [toISOString()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Date/toISOString) +// method. In Python, a standard `datetime.datetime` object can be converted +// to this format using +// [`strftime`](https://docs.python.org/2/library/time.html#time.strftime) with +// the time format spec '%Y-%m-%dT%H:%M:%S.%fZ'. Likewise, in Java, one can use +// the Joda Time's [`ISODateTimeFormat.dateTime()`]( +// http://www.joda.org/joda-time/apidocs/org/joda/time/format/ISODateTimeFormat.html#dateTime%2D%2D +// ) to obtain a formatter capable of generating timestamps in this format. +// +// +message Timestamp { + // Represents seconds of UTC time since Unix epoch + // 1970-01-01T00:00:00Z. Must be from 0001-01-01T00:00:00Z to + // 9999-12-31T23:59:59Z inclusive. + int64 seconds = 1; + + // Non-negative fractions of a second at nanosecond resolution. Negative + // second values with fractions must still have non-negative nanos values + // that count forward in time. Must be from 0 to 999,999,999 + // inclusive. + int32 nanos = 2; +} diff --git a/lib/codecs/tests/protobuf.rs b/lib/codecs/tests/protobuf.rs new file mode 100644 index 0000000000000..6c4b411416b71 --- /dev/null +++ b/lib/codecs/tests/protobuf.rs @@ -0,0 +1,67 @@ +//! Tests for the behaviour of Protobuf serializers and deserializers (together). + +use bytes::{Bytes, BytesMut}; +use std::path::{Path, PathBuf}; +use tokio_util::codec::Encoder; +use vector_core::config::LogNamespace; + +use codecs::decoding::format::Deserializer; +use codecs::decoding::{ + ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufDeserializerOptions, +}; +use codecs::encoding::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; + +fn test_data_dir() -> PathBuf { + PathBuf::from(std::env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf") +} + +fn read_protobuf_bin_message(path: &Path) -> Bytes { + let message_raw = std::fs::read(path).unwrap(); + Bytes::copy_from_slice(&message_raw) +} + +/// Build the serializer and deserializer from common settings +fn build_rializers( + desc_file: PathBuf, + message_type: String, +) -> (ProtobufSerializer, ProtobufDeserializer) { + let serializer = ProtobufSerializerConfig { + protobuf: ProtobufSerializerOptions { + desc_file: desc_file.clone(), + message_type: message_type.clone(), + }, + } + .build() + .unwrap(); + let deserializer = ProtobufDeserializerConfig { + protobuf: ProtobufDeserializerOptions { + desc_file, + message_type, + }, + } + .build() + .unwrap(); + (serializer, deserializer) +} + +#[test] +fn roundtrip_coding() { + let protobuf_message = read_protobuf_bin_message(&test_data_dir().join("person_someone.pb")); + let desc_file = test_data_dir().join("test_protobuf.desc"); + let message_type: String = "test_protobuf.Person".into(); + let (mut serializer, deserializer) = build_rializers(desc_file, message_type); + + let events_original = deserializer + .parse(protobuf_message, LogNamespace::Vector) + .unwrap(); + assert_eq!(1, events_original.len()); + let mut new_message = BytesMut::new(); + serializer + .encode(events_original[0].clone(), &mut new_message) + .unwrap(); + let protobuf_message: Bytes = new_message.into(); + let events_encoded = deserializer + .parse(protobuf_message, LogNamespace::Vector) + .unwrap(); + assert_eq!(events_original, events_encoded); +} From 0abcc4cf244f0b8e9b7d93ef05bb23745f6d7bed Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 27 Sep 2023 17:48:46 +0000 Subject: [PATCH 16/17] spelling (maybe?) --- lib/codecs/tests/protobuf.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/codecs/tests/protobuf.rs b/lib/codecs/tests/protobuf.rs index 6c4b411416b71..92acbffd9c3a8 100644 --- a/lib/codecs/tests/protobuf.rs +++ b/lib/codecs/tests/protobuf.rs @@ -1,4 +1,4 @@ -//! Tests for the behaviour of Protobuf serializers and deserializers (together). +//! Tests for the behaviour of Protobuf serializer and deserializer (together). use bytes::{Bytes, BytesMut}; use std::path::{Path, PathBuf}; @@ -21,7 +21,7 @@ fn read_protobuf_bin_message(path: &Path) -> Bytes { } /// Build the serializer and deserializer from common settings -fn build_rializers( +fn build_serializer_pair( desc_file: PathBuf, message_type: String, ) -> (ProtobufSerializer, ProtobufDeserializer) { @@ -49,7 +49,7 @@ fn roundtrip_coding() { let protobuf_message = read_protobuf_bin_message(&test_data_dir().join("person_someone.pb")); let desc_file = test_data_dir().join("test_protobuf.desc"); let message_type: String = "test_protobuf.Person".into(); - let (mut serializer, deserializer) = build_rializers(desc_file, message_type); + let (mut serializer, deserializer) = build_serializer_pair(desc_file, message_type); let events_original = deserializer .parse(protobuf_message, LogNamespace::Vector) From 62b1f38b254160afe0081128a3028d2f20effe06 Mon Sep 17 00:00:00 2001 From: Glen Oakley Date: Wed, 27 Sep 2023 17:51:03 +0000 Subject: [PATCH 17/17] fix clippy errors --- lib/codecs/src/encoding/format/protobuf.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index 751d8161f3c0d..a67a631edf3eb 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -82,10 +82,9 @@ fn convert_value_raw( let string = String::from_utf8_lossy(&b).into_owned(); if let Some(d) = descriptor .values() - .filter(|v| v.name().eq_ignore_ascii_case(&string)) - .next() + .find(|v| v.name().eq_ignore_ascii_case(&string)) { - return Ok(prost_reflect::Value::EnumNumber(d.number())); + Ok(prost_reflect::Value::EnumNumber(d.number())) } else { Err(format!( "Enum `{}` has no value that matches string '{}'", @@ -112,7 +111,7 @@ fn convert_value_raw( if message_descriptor.is_map_entry() { let value_field = message_descriptor .get_field_by_name("value") - .ok_or_else(|| "Internal error with proto map processing")?; + .ok_or("Internal error with proto map processing")?; let mut map: HashMap = HashMap::new(); for (key, val) in o.into_iter() { match convert_value(&value_field, val) {