Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ publish = false
default-run = "vector"
autobenches = false # our benchmarks are not runnable on their own either way
# Minimum supported rust version
rust-version = "1.66.0"
rust-version = "1.70.0"

[[bin]]
name = "vector"
Expand Down
1 change: 0 additions & 1 deletion lib/vector-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ indexmap = { version = "1.9", default-features = false }
inventory = { version = "0.3" }
no-proxy = { version = "0.3.1", default-features = false, features = ["serialize"] }
num-traits = { version = "0.2.15", default-features = false }
once_cell = { version = "1", default-features = false }
serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde_with = { version = "2.3.2", default-features = false, features = ["std"] }
Expand Down
7 changes: 3 additions & 4 deletions lib/vector-config/src/schema/parser/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fs::File, io::BufReader, path::Path};
use std::{fs::File, io::BufReader, path::Path, sync::OnceLock};

use once_cell::sync::OnceCell;
use serde_json::Value;
use snafu::Snafu;
use vector_config_common::{
Expand Down Expand Up @@ -415,8 +414,8 @@ impl<'a> QueryableSchema for SimpleSchema<'a> {
}

fn schema_to_simple_schema(schema: &Schema) -> SimpleSchema<'_> {
static TRUE_SCHEMA_OBJECT: OnceCell<SchemaObject> = OnceCell::new();
static FALSE_SCHEMA_OBJECT: OnceCell<SchemaObject> = OnceCell::new();
static TRUE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();
static FALSE_SCHEMA_OBJECT: OnceLock<SchemaObject> = OnceLock::new();

let schema_object = match schema {
Schema::Bool(bool) => {
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::OnceLock;

use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::Lazy;
use vector_config::configurable_component;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA: OnceLock<LogSchema> = OnceLock::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);

/// Loads Log Schema from configurations and sets global schema. Once this is
Expand Down
5 changes: 2 additions & 3 deletions lib/vector-core/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ mod recency;
mod recorder;
mod storage;

use std::time::Duration;
use std::{sync::OnceLock, time::Duration};

use chrono::Utc;
use metrics::Key;
use metrics_tracing_context::TracingContextLayer;
use metrics_util::layers::Layer;
use once_cell::sync::OnceCell;
use snafu::Snafu;

pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
Expand All @@ -29,7 +28,7 @@ pub enum Error {
TimeoutMustBePositive { timeout: f64 },
}

static CONTROLLER: OnceCell<Controller> = OnceCell::new();
static CONTROLLER: OnceLock<Controller> = OnceLock::new();

// Cardinality counter parameters, expose the internal metrics registry
// cardinality. Useful for the end users to help understand the characteristics
Expand Down
3 changes: 1 addition & 2 deletions lib/vector-core/src/metrics/recorder.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::{atomic::Ordering, Arc, RwLock};
use std::time::Duration;
use std::{cell::OnceCell, time::Duration};

use chrono::Utc;
use metrics::{Counter, Gauge, Histogram, Key, KeyName, Recorder, SharedString, Unit};
use metrics_util::{registry::Registry as MetricsRegistry, MetricKindMask};
use once_cell::unsync::OnceCell;
use quanta::Clock;

use super::recency::{GenerationalStorage, Recency};
Expand Down
5 changes: 2 additions & 3 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod region;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};
use std::time::SystemTime;

Expand All @@ -26,7 +26,6 @@ use aws_types::credentials::{ProvideCredentials, SharedCredentialsProvider};
use aws_types::region::Region;
use aws_types::SdkConfig;
use bytes::Bytes;
use once_cell::sync::OnceCell;
use regex::RegexSet;
pub use region::RegionOrEndpoint;
use tower::{Layer, Service, ServiceBuilder};
Expand All @@ -36,7 +35,7 @@ use crate::http::{build_proxy_connector, build_tls_connector};
use crate::internal_events::AwsBytesSent;
use crate::tls::{MaybeTlsSettings, TlsConfig};

static RETRIABLE_CODES: OnceCell<RegexSet> = OnceCell::new();
static RETRIABLE_CODES: OnceLock<RegexSet> = OnceLock::new();

pub fn is_retriable_error<T>(error: &SdkError<T>) -> bool {
match error {
Expand Down
6 changes: 2 additions & 4 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
convert::Infallible,
hash::Hash,
mem::{discriminant, Discriminant},
net::SocketAddr,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
time::{Duration, Instant},
};
Expand Down Expand Up @@ -164,9 +164,7 @@ impl Default for PrometheusExporterConfig {
}
}

fn default_address() -> SocketAddr {
use std::net::{IpAddr, Ipv4Addr};

const fn default_address() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 9598)
}

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/statsd/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Mode {
}
}

fn default_address() -> SocketAddr {
const fn default_address() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8125)
}

Expand Down
11 changes: 2 additions & 9 deletions src/sinks/util/service/net/udp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::SocketAddr;

use snafu::ResultExt;
use tokio::net::UdpSocket;
Expand Down Expand Up @@ -59,7 +59,7 @@ impl UdpConnector {
.ok_or(NetError::NoAddresses)?;

let addr = SocketAddr::new(ip, self.address.port);
let bind_address = find_bind_address(&addr);
let bind_address = crate::sinks::util::udp::find_bind_address(&addr);

let socket = UdpSocket::bind(bind_address).await.context(FailedToBind)?;

Expand All @@ -74,10 +74,3 @@ impl UdpConnector {
Ok(socket)
}
}

fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
match remote_addr {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
}
}
2 changes: 1 addition & 1 deletion src/sinks/util/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ async fn udp_send(socket: &mut UdpSocket, buf: &[u8]) -> tokio::io::Result<()> {
Ok(())
}

fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr {
match remote_addr {
SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0),
SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0),
Expand Down
5 changes: 2 additions & 3 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashMap},
io::Cursor,
sync::Arc,
sync::{Arc, OnceLock},
time::Duration,
};

Expand All @@ -14,7 +14,6 @@ use codecs::{
};
use futures::{Stream, StreamExt};
use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
use once_cell::sync::OnceCell;
use rdkafka::{
consumer::{CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer},
message::{BorrowedMessage, Headers as _, Message},
Expand Down Expand Up @@ -724,7 +723,7 @@ fn create_consumer(config: &KafkaSourceConfig) -> crate::Result<StreamConsumer<C
#[derive(Default)]
struct CustomContext {
stats: kafka::KafkaStatisticsContext,
finalizer: OnceCell<Arc<OrderedFinalizer<FinalizerEntry>>>,
finalizer: OnceLock<Arc<OrderedFinalizer<FinalizerEntry>>>,
}

impl CustomContext {
Expand Down
5 changes: 2 additions & 3 deletions src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use std::{
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Mutex, MutexGuard,
Mutex, MutexGuard, OnceLock,
},
};

use futures_util::{future::ready, Stream, StreamExt};
use lookup::event_path;
use metrics_tracing_context::MetricsLayer;
use once_cell::sync::OnceCell;
use tokio::sync::{
broadcast::{self, Receiver, Sender},
oneshot,
Expand Down Expand Up @@ -51,7 +50,7 @@ static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =

/// SENDER holds the sender/receiver handle that will receive a copy of all the internal log events *after* the topology
/// has been initialized.
static SENDER: OnceCell<Sender<LogEvent>> = OnceCell::new();
static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();

fn metrics_layer_enabled() -> bool {
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
Expand Down
11 changes: 6 additions & 5 deletions vdev/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::ffi::{OsStr, OsString};
pub use std::process::Command;
use std::{
borrow::Cow, env, io::Read, path::PathBuf, process::ExitStatus, process::Stdio, time::Duration,
borrow::Cow, env, io::Read, path::PathBuf, process::ExitStatus, process::Stdio, sync::OnceLock,
time::Duration,
};

use anyhow::{bail, Context as _, Result};
use indicatif::{ProgressBar, ProgressStyle};
use log::LevelFilter;
use once_cell::sync::{Lazy, OnceCell};
use once_cell::sync::Lazy;

use crate::{config::Config, git, platform, util};

Expand All @@ -25,9 +26,9 @@ const DEFAULT_SHELL: &str = "/bin/sh";
pub static SHELL: Lazy<OsString> =
Lazy::new(|| (env::var_os("SHELL").unwrap_or_else(|| DEFAULT_SHELL.into())));

static VERBOSITY: OnceCell<LevelFilter> = OnceCell::new();
static CONFIG: OnceCell<Config> = OnceCell::new();
static PATH: OnceCell<String> = OnceCell::new();
static VERBOSITY: OnceLock<LevelFilter> = OnceLock::new();
static CONFIG: OnceLock<Config> = OnceLock::new();
static PATH: OnceLock<String> = OnceLock::new();

pub fn verbosity() -> &'static LevelFilter {
VERBOSITY.get().expect("verbosity is not initialized")
Expand Down