Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/newline_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct NewlineDelimitedDecoderOptions {
/// consider setting the maximum length to a reasonably large value as a safety net. This
/// ensures that processing is not actually unbounded.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

impl NewlineDelimitedDecoderOptions {
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/framing/octet_counting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl OctetCountingDecoderConfig {
pub struct OctetCountingDecoderOptions {
/// The maximum length of the byte buffer.
#[serde(skip_serializing_if = "vector_core::serde::skip_serializing_if_default")]
max_length: Option<usize>,
pub max_length: Option<usize>,
}

/// Codec using the `Octet Counting` format as specified in
Expand Down
72 changes: 25 additions & 47 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod udp;
#[cfg(unix)]
mod unix;

use codecs::{decoding::DeserializerConfig, NewlineDelimitedDecoderConfig};
use codecs::decoding::DeserializerConfig;
use lookup::{lookup_v2::OptionalValuePath, owned_value_path};
use value::{kind::Collection, Kind};
use vector_config::configurable_component;
Expand Down Expand Up @@ -113,26 +113,18 @@ impl SourceConfig for SocketConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
match self.mode.clone() {
Mode::Tcp(config) => {
let decoding = config.decoding().clone();
// TODO: in v0.30.0 , remove the `max_length` setting from
// the UnixConfig, and all of the below mess and replace
// it with the configured framing /
// decoding.default_stream_framing().
let framing = match (config.framing().clone(), config.max_length()) {
(Some(framing), Some(_)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Since a `framing` setting was provided, the `max_length` setting has no effect.");
framing
}
(Some(framing), None) => framing,
(None, Some(max_length)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Please configure the `max_length` from the `framing` setting instead.");
NewlineDelimitedDecoderConfig::new_with_max_length(max_length).into()
}
(None, None) => decoding.default_stream_framing(),
};

let log_namespace = cx.log_namespace(config.log_namespace);
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();

let decoding = config.decoding().clone();
let decoder = DecodingConfig::new(
config
.framing
.clone()
.unwrap_or_else(|| decoding.default_stream_framing()),
decoding,
log_namespace,
)
.build();

let tcp = tcp::RawTcpSource::new(config.clone(), decoder, log_namespace);
let tls_config = config.tls().as_ref().map(|tls| tls.tls_config.clone());
Expand Down Expand Up @@ -190,27 +182,18 @@ impl SourceConfig for SocketConfig {
}
#[cfg(unix)]
Mode::UnixStream(config) => {
let decoding = config.decoding.clone();

// TODO: in v0.30.0 , remove the `max_length` setting from
// the UnixConfig, and all of the below mess and replace
// it with the configured framing /
// decoding.default_stream_framing().
let framing = match (config.framing.clone(), config.max_length) {
(Some(framing), Some(_)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Since a `framing` setting was provided, the `max_length` setting has no effect.");
framing
}
(Some(framing), None) => framing,
(None, Some(max_length)) => {
warn!(message = "DEPRECATION: The `max_length` setting is deprecated and will be removed in an upcoming release. Please configure the `max_length` from the `framing` setting instead.");
NewlineDelimitedDecoderConfig::new_with_max_length(max_length).into()
}
(None, None) => decoding.default_stream_framing(),
};

let log_namespace = cx.log_namespace(config.log_namespace);
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build();

let decoding = config.decoding().clone();
let decoder = DecodingConfig::new(
config
.framing
.clone()
.unwrap_or_else(|| decoding.default_stream_framing()),
decoding,
log_namespace,
)
.build();

unix::unix_stream(config, decoder, cx.shutdown, cx.out, log_namespace)
}
Expand Down Expand Up @@ -335,10 +318,6 @@ pub(crate) fn default_host_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!(log_schema().host_key()))
}

fn default_max_length() -> Option<usize> {
Some(crate::serde::default_max_length())
}

#[cfg(test)]
mod test {
use approx::assert_relative_eq;
Expand Down Expand Up @@ -529,7 +508,6 @@ mod test {
let addr = next_addr();

let mut config = TcpConfig::from_address(addr.into());
config.set_max_length(None);
config.set_framing(Some(
NewlineDelimitedDecoderConfig::new_with_max_length(10).into(),
));
Expand Down Expand Up @@ -1026,7 +1004,7 @@ mod test {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
config.max_length = Some(11);
config.max_length = 11;
let address = init_udp_with_config(tx, config).await;

send_lines_udp(
Expand Down Expand Up @@ -1062,7 +1040,7 @@ mod test {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
config.max_length = Some(10);
config.max_length = 10;
config.framing = CharacterDelimitedDecoderConfig {
character_delimited: CharacterDelimitedDecoderOptions::new(b',', None),
}
Expand Down
26 changes: 3 additions & 23 deletions src/sources/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
tls::TlsSourceConfig,
};

use super::{default_host_key, default_max_length, SocketConfig};
use super::{default_host_key, SocketConfig};

/// TCP configuration for the `socket` source.
#[serde_as]
Expand All @@ -30,16 +30,6 @@ pub struct TcpConfig {
#[configurable(derived)]
keepalive: Option<TcpKeepaliveConfig>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
// TODO: communicated as deprecated in v0.29.0, can be removed in v0.30.0
#[configurable(
deprecated = "This option has been deprecated. Configure `max_length` on the framing config instead."
)]
#[configurable(metadata(docs::type_unit = "bytes"))]
max_length: Option<usize>,

/// The timeout before a connection is forcefully closed during shutdown.
#[serde(default = "default_shutdown_timeout_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
Expand Down Expand Up @@ -85,11 +75,11 @@ pub struct TcpConfig {
pub connection_limit: Option<u32>,

#[configurable(derived)]
framing: Option<FramingConfig>,
pub(super) framing: Option<FramingConfig>,

#[configurable(derived)]
#[serde(default = "default_decoding")]
decoding: DeserializerConfig,
pub(super) decoding: DeserializerConfig,

/// The namespace to use for logs. This overrides the global setting.
#[serde(default)]
Expand All @@ -110,7 +100,6 @@ impl TcpConfig {
Self {
address,
keepalive: None,
max_length: default_max_length(),
shutdown_timeout_secs: default_shutdown_timeout_secs(),
host_key: default_host_key(),
port_key: default_port_key(),
Expand Down Expand Up @@ -152,10 +141,6 @@ impl TcpConfig {
self.keepalive
}

pub const fn max_length(&self) -> Option<usize> {
self.max_length
}

pub const fn shutdown_timeout_secs(&self) -> Duration {
self.shutdown_timeout_secs
}
Expand All @@ -173,11 +158,6 @@ impl TcpConfig {
self
}

pub fn set_max_length(&mut self, val: Option<usize>) -> &mut Self {
self.max_length = val;
self
}

pub fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
self.shutdown_timeout_secs = Duration::from_secs(val);
self
Expand Down
10 changes: 4 additions & 6 deletions src/sources/socket/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct UdpConfig {
/// Messages larger than this are truncated.
#[serde(default = "default_max_length")]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub(super) max_length: Option<usize>,
pub(super) max_length: usize,

/// Overrides the name of the log field used to add the peer host to each event.
///
Expand Down Expand Up @@ -95,8 +95,8 @@ fn default_port_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("port"))
}

fn default_max_length() -> Option<usize> {
Some(crate::serde::default_max_length())
fn default_max_length() -> usize {
crate::serde::default_max_length()
}

impl UdpConfig {
Expand Down Expand Up @@ -163,9 +163,7 @@ pub(super) fn udp(
}
}

let mut max_length = config
.max_length
.unwrap_or_else(|| default_max_length().unwrap());
let mut max_length = config.max_length;

if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
max_length = std::cmp::min(max_length, receive_buffer_bytes);
Expand Down
29 changes: 14 additions & 15 deletions src/sources/socket/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
SourceSender,
};

use super::{default_host_key, default_max_length, SocketConfig};
use super::{default_host_key, SocketConfig};

/// Unix domain socket configuration for the `socket` source.
#[configurable_component]
Expand All @@ -41,16 +41,6 @@ pub struct UnixConfig {
#[configurable(metadata(docs::examples = 508))]
pub socket_file_mode: Option<u32>,

/// The maximum buffer size of incoming messages.
///
/// Messages larger than this are truncated.
// TODO: communicated as deprecated in v0.29.0, can be removed in v0.30.0
#[configurable(
deprecated = "This option has been deprecated. Configure `max_length` on the framing config instead."
)]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_length: Option<usize>,

/// Overrides the name of the log field used to add the peer host to each event.
///
/// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
Expand Down Expand Up @@ -82,7 +72,6 @@ impl UnixConfig {
Self {
path,
socket_file_mode: None,
max_length: default_max_length(),
host_key: default_host_key(),
framing: None,
decoding: default_decoding(),
Expand Down Expand Up @@ -135,12 +124,22 @@ pub(super) fn unix_datagram(
out: SourceSender,
log_namespace: LogNamespace,
) -> crate::Result<Source> {
let max_length = config
.framing
.and_then(|framing| match framing {
FramingConfig::CharacterDelimited {
character_delimited,
} => character_delimited.max_length,
FramingConfig::NewlineDelimited { newline_delimited } => newline_delimited.max_length,
FramingConfig::OctetCounting { octet_counting } => octet_counting.max_length,
_ => None,
})
.unwrap_or_else(crate::serde::default_max_length);

build_unix_datagram_source(
config.path,
config.socket_file_mode,
config
.max_length
.unwrap_or_else(crate::serde::default_max_length),
max_length,
decoder,
move |events, received_from| {
handle_events(events, &config.host_key, received_from, log_namespace)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
---
date: "2023-05-23"
title: "0.30 Upgrade Guide"
description: "An upgrade guide that addresses breaking changes in 0.30.0"
authors: ["neuronull"]
release: "0.30.0"
hide_on_release_notes: false
badges:
type: breaking change
---

Vector's 0.30.0 release includes **breaking changes**:

1. [Removal of the `socket` source's `tcp` and `unix` mode `max_length` setting](#socket-source-max-length)

We cover them below to help you upgrade quickly:

## Upgrade guide

### Breaking changes

#### Removal of the `socket` source's `tcp` and `unix` mode `max_length` setting {#socket-source-max-length}

In v0.29.0 the `max_length` setting, used by the `tcp` and `unix` modes
of the `socket` source, was marked as deprecated.

That setting was replaced by the `max_length` setting within the `framing`
setting.

Any explicit usages of `max_length` for those modes of the `socket`
source will no longer work and configurations will need to instead use
a `framing` setting in order to set a maximum length.
10 changes: 6 additions & 4 deletions website/cue/reference/components/sources/base/socket.cue
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,17 @@ base: components: sources: socket: configuration: {
type: uint: unit: "seconds"
}
max_length: {
deprecated: true
deprecated_message: "This option has been deprecated. Configure `max_length` on the framing config instead."
description: """
The maximum buffer size of incoming messages.

Messages larger than this are truncated.
"""
required: false
type: uint: unit: "bytes"
relevant_when: "mode = \"udp\""
required: false
type: uint: {
default: 102400
unit: "bytes"
}
}
mode: {
description: "The type of socket to use."
Expand Down