Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/sinks/clickhouse/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ pub struct ClickhouseConfig {

/// The table that data is inserted into.
#[configurable(metadata(docs::examples = "mytable"))]
pub table: String,
pub table: Template,

/// The database that contains the table that data is inserted into.
#[configurable(metadata(docs::examples = "mydatabase"))]
pub database: Option<String>,
pub database: Option<Template>,

/// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
#[serde(default)]
Expand Down Expand Up @@ -90,25 +90,30 @@ impl SinkConfig for ClickhouseConfig {
let service = ClickhouseService::new(
client.clone(),
auth.clone(),
&endpoint,
self.database.as_deref(),
self.table.as_str(),
endpoint.clone(),
self.skip_unknown_fields,
self.date_time_best_effort,
)?;
);

let request_limits = self.request.unwrap_with(&Default::default());
let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);

let batch_settings = self.batch.into_batcher_settings()?;
let database = self.database.clone().unwrap_or_else(|| {
"default"
.try_into()
.expect("'default' should be a valid template")
});
let sink = ClickhouseSink::new(
batch_settings,
self.compression,
self.encoding.clone(),
service,
protocol,
database,
self.table.clone(),
);

let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
Expand All @@ -126,7 +131,6 @@ impl SinkConfig for ClickhouseConfig {
}

async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
// TODO: check if table exists?
let uri = format!("{}/?query=SELECT%201", endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();

Expand Down
74 changes: 69 additions & 5 deletions src/sinks/clickhouse/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn insert_events() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
batch,
request: TowerRequestConfig {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn skip_unknown_fields() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
skip_unknown_fields: true,
compression: Compression::None,
batch,
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn insert_events_unix_timestamps() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(),
batch,
Expand Down Expand Up @@ -269,7 +269,7 @@ async fn no_retry_on_incorrect_data() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: table.clone(),
table: table.clone().try_into().unwrap(),
compression: Compression::None,
batch,
..Default::default()
Expand Down Expand Up @@ -320,7 +320,7 @@ async fn no_retry_on_incorrect_data_warp() {

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: gen_table(),
table: gen_table().try_into().unwrap(),
batch,
..Default::default()
};
Expand All @@ -338,6 +338,70 @@ async fn no_retry_on_incorrect_data_warp() {
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
}

#[tokio::test]
async fn templated_table() {
trace_init();

let n_tables = 2;
let table_events: Vec<(String, Event, BatchStatusReceiver)> = (0..n_tables)
.map(|_| {
let table = gen_table();
let (mut event, receiver) = make_event();
event.as_mut_log().insert("table", table.as_str());
(table, event, receiver)
})
.collect();

let host = clickhouse_address();

let mut batch = BatchConfig::default();
batch.max_events = Some(1);

let config = ClickhouseConfig {
endpoint: host.parse().unwrap(),
table: "{{ .table }}".try_into().unwrap(),
batch,
..Default::default()
};

let client = ClickhouseClient::new(host);
for (table, _, _) in &table_events {
client
.create_table(
table,
"host String, timestamp String, message String, table String",
)
.await;
}

let (sink, _hc) = config.build(SinkContext::default()).await.unwrap();

let events: Vec<Event> = table_events
.iter()
.map(|(_, event, _)| event.clone())
.collect();
run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await;

for (table, event, mut receiver) in table_events {
let output = client.select_all(&table).await;
assert_eq!(1, output.rows, "table {} should have 1 row", table);

let expected = serde_json::to_value(event.into_log()).unwrap();
assert_eq!(
expected, output.data[0],
"table \"{}\"'s one row should have the correct data",
table
);

assert_eq!(
receiver.try_recv(),
Ok(BatchStatus::Delivered),
"table \"{}\"'s event should have been delivered",
table
);
}
}

fn make_event() -> (Event, BatchStatusReceiver) {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch);
Expand Down
61 changes: 35 additions & 26 deletions src/sinks/clickhouse/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::{

#[derive(Debug, Clone)]
pub struct ClickhouseRequest {
pub database: String,
pub table: String,
pub body: Bytes,
pub compression: Compression,
pub finalizers: EventFinalizers,
Expand Down Expand Up @@ -107,31 +109,28 @@ impl RetryLogic for ClickhouseRetryLogic {
#[derive(Debug, Clone)]
pub struct ClickhouseService {
client: HttpClient,
uri: Uri,
auth: Option<Auth>,
endpoint: Uri,
skip_unknown_fields: bool,
date_time_best_effort: bool,
}

impl ClickhouseService {
/// Creates a new `ClickhouseService`.
pub fn new(
pub const fn new(
client: HttpClient,
auth: Option<Auth>,
endpoint: &Uri,
database: Option<&str>,
table: &str,
endpoint: Uri,
skip_unknown_fields: bool,
date_time_best_effort: bool,
) -> crate::Result<Self> {
// Set the URI query once during initialization, as it won't change throughout the lifecycle
// of the service.
let uri = set_uri_query(
) -> Self {
Self {
client,
auth,
endpoint,
database.unwrap_or("default"),
table,
skip_unknown_fields,
date_time_best_effort,
)?;
Ok(Self { client, auth, uri })
}
}
}

Expand All @@ -148,22 +147,32 @@ impl Service<ClickhouseRequest> for ClickhouseService {
// Emission of Error internal event is handled upstream by the caller.
fn call(&mut self, request: ClickhouseRequest) -> Self::Future {
let mut client = self.client.clone();
let auth = self.auth.clone();

let mut builder = Request::post(&self.uri)
.header(CONTENT_TYPE, "application/x-ndjson")
.header(CONTENT_LENGTH, request.body.len());
if let Some(ce) = request.compression.content_encoding() {
builder = builder.header(CONTENT_ENCODING, ce);
}
if let Some(auth) = &self.auth {
builder = auth.apply_builder(builder);
}

let http_request = builder
.body(Body::from(request.body))
.expect("building HTTP request failed unexpectedly");
// Build the URI outside of the boxed future to avoid unnecessary clones.
let uri = set_uri_query(
&self.endpoint,
&request.database,
&request.table,
self.skip_unknown_fields,
self.date_time_best_effort,
);

Box::pin(async move {
let mut builder = Request::post(&uri?)
.header(CONTENT_TYPE, "application/x-ndjson")
.header(CONTENT_LENGTH, request.body.len());
if let Some(ce) = request.compression.content_encoding() {
builder = builder.header(CONTENT_ENCODING, ce);
}
if let Some(auth) = auth {
builder = auth.apply_builder(builder);
}

let http_request = builder
.body(Body::from(request.body))
.expect("building HTTP request failed unexpectedly");

let response = client.call(http_request).in_current_span().await?;
let (parts, body) = response.into_parts();
let body = body::to_bytes(body).await?;
Expand Down
Loading