Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions lib/vector-core/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ impl VectorSink {
}

/// Converts an event sink into a `VectorSink`
///
/// Deprecated in favor of `VectorSink::from_event_streamsink`.
#[deprecated]
pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
VectorSink::Sink(Box::new(EventSink::new(sink)))
}
Expand Down
1 change: 1 addition & 0 deletions src/config/unit_test/unit_test_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ impl SinkConfig for UnitTestStreamSinkConfig {
let sink = self.sink.lock().await.take().unwrap();
let healthcheck = future::ok(()).boxed();

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl SinkConfig for AppsignalSinkConfig {
)
.boxed();

#[allow(deprecated)]
Ok((super::VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ impl CloudWatchMetricsSvc {
})
});

#[allow(deprecated)]
Ok(VectorSink::from_event_sink(sink))
}

Expand Down
2 changes: 2 additions & 0 deletions src/sinks/azure_monitor_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl SinkConfig for AzureMonitorLogsConfig {
)
.sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error));

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down Expand Up @@ -471,6 +472,7 @@ mod tests {
.sink_map_err(|error| error!(message = "Fatal azure_monitor_logs sink error.", %error));

let event = Event::Log(LogEvent::from("simple message"));
#[allow(deprecated)]
run_and_assert_sink_compliance(
VectorSink::from_event_sink(sink),
stream::once(ready(event)),
Expand Down
1 change: 1 addition & 0 deletions src/sinks/clickhouse/http_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub(crate) async fn build_http_sink(

let healthcheck = healthcheck(client, config).boxed();

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl SinkConfig for PubsubConfig {
)
.sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error));

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/gcp/stackdriver_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl SinkConfig for StackdriverConfig {
)
.sink_map_err(|error| error!(message = "Fatal gcp_stackdriver_logs sink error.", %error));

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/gcp/stackdriver_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl SinkConfig for StackdriverConfig {
|error| error!(message = "Fatal gcp_stackdriver_metrics sink error.", %error),
);

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/honeycomb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl SinkConfig for HoneycombConfig {

let healthcheck = healthcheck(self.clone(), client).boxed();

#[allow(deprecated)]
Ok((super::VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl SinkConfig for HttpSinkConfig {
)
.sink_map_err(|error| error!(message = "Fatal HTTP sink error.", %error));

#[allow(deprecated)]
let sink = super::VectorSink::from_event_sink(sink);

Ok((sink, healthcheck))
Expand Down
1 change: 1 addition & 0 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ impl SinkConfig for InfluxDbLogsConfig {
)
.sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/influxdb/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl InfluxDbSvc {
})
.sink_map_err(|error| error!(message = "Fatal influxdb sink error.", %error));

#[allow(deprecated)]
Ok(VectorSink::from_event_sink(sink))
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/mezmo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl SinkConfig for MezmoConfig {

let healthcheck = healthcheck(self.clone(), client).boxed();

#[allow(deprecated)]
Ok((super::VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ impl SinkConfig for RemoteWriteConfig {
)
};

#[allow(deprecated)]
Ok((sinks::VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl RedisSinkConfig {
})
.sink_map_err(|error| error!(message = "Sink failed to flush.", %error));

#[allow(deprecated)]
Ok(super::VectorSink::from_event_sink(sink))
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/sematext/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ impl SematextMetricsService {
})
.sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error));

#[allow(deprecated)]
Ok(VectorSink::from_event_sink(sink))
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/util/adaptive_concurrency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ impl SinkConfig for TestConfig {
);
*self.controller_stats.lock().unwrap() = stats;

#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}

Expand Down
1 change: 1 addition & 0 deletions src/test_util/mock/sinks/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl_generate_config_from_default!(ErrorSinkConfig);
#[async_trait]
impl SinkConfig for ErrorSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
#[allow(deprecated)]
Ok((VectorSink::from_event_sink(ErrorSink), ok(()).boxed()))
}

Expand Down
1 change: 1 addition & 0 deletions src/test_util/mock/sinks/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl_generate_config_from_default!(PanicSinkConfig);
#[async_trait]
impl SinkConfig for PanicSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
#[allow(deprecated)]
Ok((VectorSink::from_event_sink(PanicSink), ok(()).boxed()))
}

Expand Down