diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 6acbf99967b43..60f0099c94884 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -236,6 +236,17 @@ impl WebSocketSink { Ok(()) } + const fn should_encode_as_binary(&self) -> bool { + use codecs::encoding::Serializer::{ + Avro, Csv, Gelf, Json, Logfmt, Native, NativeJson, RawMessage, Text, + }; + + match self.encoder.serializer() { + RawMessage(_) | Avro(_) | Native(_) => true, + Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, + } + } + async fn handle_events( &mut self, input: &mut I, @@ -261,6 +272,7 @@ impl WebSocketSink { let bytes_sent = register!(BytesSent::from(Protocol("websocket".into()))); let events_sent = register!(EventsSent::from(Output(None))); + let encode_as_binary = self.should_encode_as_binary(); loop { let result = tokio::select! { @@ -301,7 +313,12 @@ impl WebSocketSink { Ok(()) => { finalizers.update_status(EventStatus::Delivered); - let message = Message::text(String::from_utf8_lossy(&bytes)); + let message = if encode_as_binary { + Message::binary(bytes) + } + else { + Message::text(String::from_utf8_lossy(&bytes)) + }; let message_len = message.len(); ws_sink.send(message).await.map(|_| {