Skip to content

Commit 785e166

Browse files
committed
su
su fmt
1 parent 7ebe0a2 commit 785e166

15 files changed

+106
-174
lines changed

src/connector/src/lib.rs

-21
Original file line numberDiff line numberDiff line change
@@ -75,27 +75,6 @@ where
7575
})
7676
}
7777

78-
pub(crate) fn deserialize_optional_u64_from_string<'de, D>(
79-
deserializer: D,
80-
) -> Result<Option<u64>, D::Error>
81-
where
82-
D: de::Deserializer<'de>,
83-
{
84-
let s: String = de::Deserialize::deserialize(deserializer)?;
85-
if s.is_empty() {
86-
Ok(None)
87-
} else {
88-
s.parse()
89-
.map_err(|_| {
90-
de::Error::invalid_value(
91-
de::Unexpected::Str(&s),
92-
&"integer greater than or equal to 0",
93-
)
94-
})
95-
.map(Some)
96-
}
97-
}
98-
9978
pub(crate) fn deserialize_optional_string_seq_from_string<'de, D>(
10079
deserializer: D,
10180
) -> std::result::Result<Option<Vec<String>>, D::Error>

src/connector/src/sink/clickhouse.rs

+23-20
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,18 @@ use risingwave_common::types::{DataType, Decimal, ScalarRefImpl, Serial};
3030
use serde::ser::{SerializeSeq, SerializeStruct};
3131
use serde::Serialize;
3232
use serde_derive::Deserialize;
33-
use serde_with::serde_as;
33+
use serde_with::{serde_as, DisplayFromStr};
3434
use thiserror_ext::AsReport;
3535
use tonic::async_trait;
3636
use tracing::warn;
3737
use with_options::WithOptions;
3838

39-
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
39+
use super::decouple_checkpoint_log_sink::{
40+
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
41+
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
42+
};
4043
use super::writer::SinkWriter;
4144
use super::{DummySinkCommitCoordinator, SinkWriterParam};
42-
use crate::deserialize_optional_u64_from_string;
4345
use crate::error::ConnectorResult;
4446
use crate::sink::catalog::desc::SinkDesc;
4547
use crate::sink::{
@@ -52,6 +54,7 @@ const QUERY_COLUMN: &str =
5254
"select distinct ?fields from system.columns where database = ? and table = ? order by ?";
5355
pub const CLICKHOUSE_SINK: &str = "clickhouse";
5456

57+
#[serde_as]
5558
#[derive(Deserialize, Debug, Clone, WithOptions)]
5659
pub struct ClickHouseCommon {
5760
#[serde(rename = "clickhouse.url")]
@@ -66,9 +69,10 @@ pub struct ClickHouseCommon {
6669
pub table: String,
6770
#[serde(rename = "clickhouse.delete.column")]
6871
pub delete_column: Option<String>,
69-
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
70-
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
71-
pub commit_checkpoint_interval: Option<u64>,
72+
/// Commit every n(>0) checkpoints, default is 10.
73+
#[serde(default = "default_commit_checkpoint_interval")]
74+
#[serde_as(as = "DisplayFromStr")]
75+
pub commit_checkpoint_interval: u64,
7276
}
7377

7478
#[allow(clippy::enum_variant_names)]
@@ -494,26 +498,25 @@ impl Sink for ClickHouseSink {
494498
const SINK_NAME: &'static str = CLICKHOUSE_SINK;
495499

496500
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
497-
let config_decouple = if let Some(interval) =
498-
desc.properties.get("commit_checkpoint_interval")
499-
&& interval.parse::<u64>().unwrap_or(0) > 1
500-
{
501-
true
502-
} else {
503-
false
504-
};
501+
let commit_checkpoint_interval =
502+
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
503+
interval
504+
.parse::<u64>()
505+
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
506+
} else {
507+
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
508+
};
505509

506510
match user_specified {
507-
SinkDecouple::Default => Ok(config_decouple),
511+
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
508512
SinkDecouple::Disable => {
509-
if config_decouple {
513+
if commit_checkpoint_interval > 1 {
510514
return Err(SinkError::Config(anyhow!(
511515
"config conflict: Clickhouse config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
512516
)));
513517
}
514518
Ok(false)
515519
}
516-
SinkDecouple::Enable => Ok(true),
517520
}
518521
}
519522

@@ -552,9 +555,9 @@ impl Sink for ClickHouseSink {
552555
self.check_pk_match(&clickhouse_column)?;
553556
}
554557

555-
if self.config.common.commit_checkpoint_interval == Some(0) {
558+
if self.config.common.commit_checkpoint_interval == 0 {
556559
return Err(SinkError::Config(anyhow!(
557-
"commit_checkpoint_interval must be greater than 0"
560+
"`commit_checkpoint_interval` must be greater than 0"
558561
)));
559562
}
560563
Ok(())
@@ -569,7 +572,7 @@ impl Sink for ClickHouseSink {
569572
)
570573
.await?;
571574
let commit_checkpoint_interval =
572-
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
575+
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
573576
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
574577
);
575578

src/connector/src/sink/decouple_checkpoint_log_sink.rs

+5
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ use async_trait::async_trait;
2020
use crate::sink::log_store::{LogStoreReadItem, TruncateOffset};
2121
use crate::sink::writer::SinkWriter;
2222
use crate::sink::{LogSinker, Result, SinkLogReader, SinkMetrics};
23+
pub const DEFAULT_COMMIT_CHECKPOINT_INTERVAL: u64 = 10;
24+
25+
pub fn default_commit_checkpoint_interval() -> u64 {
26+
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
27+
}
2328

2429
/// The `LogSinker` implementation used for commit-decoupled sinks (such as `Iceberg`, `DeltaLake` and `StarRocks`).
2530
/// The concurrent/frequent commit capability of these sinks is poor, so by leveraging the decoupled log reader,

src/connector/src/sink/deltalake.rs

+24-20
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,26 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
3838
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
3939
use risingwave_pb::connector_service::SinkMetadata;
4040
use serde_derive::{Deserialize, Serialize};
41-
use serde_with::serde_as;
41+
use serde_with::{serde_as, DisplayFromStr};
4242
use with_options::WithOptions;
4343

4444
use super::catalog::desc::SinkDesc;
4545
use super::coordinate::CoordinatedSinkWriter;
46-
use super::decouple_checkpoint_log_sink::DecoupleCheckpointLogSinkerOf;
46+
use super::decouple_checkpoint_log_sink::{
47+
default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf,
48+
DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
49+
};
4750
use super::writer::SinkWriter;
4851
use super::{
4952
Result, Sink, SinkCommitCoordinator, SinkError, SinkParam, SinkWriterParam,
5053
SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
5154
};
52-
use crate::deserialize_optional_u64_from_string;
5355

5456
pub const DELTALAKE_SINK: &str = "deltalake";
5557
pub const DEFAULT_REGION: &str = "us-east-1";
5658
pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
5759

60+
#[serde_as]
5861
#[derive(Deserialize, Serialize, Debug, Clone, WithOptions)]
5962
pub struct DeltaLakeCommon {
6063
#[serde(rename = "s3.access.key")]
@@ -69,10 +72,12 @@ pub struct DeltaLakeCommon {
6972
pub s3_endpoint: Option<String>,
7073
#[serde(rename = "gcs.service.account")]
7174
pub gcs_service_account: Option<String>,
72-
/// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
73-
#[serde(default, deserialize_with = "deserialize_optional_u64_from_string")]
74-
pub commit_checkpoint_interval: Option<u64>,
75+
/// Commit every n(>0) checkpoints, default is 10.
76+
#[serde(default = "default_commit_checkpoint_interval")]
77+
#[serde_as(as = "DisplayFromStr")]
78+
pub commit_checkpoint_interval: u64,
7579
}
80+
7681
impl DeltaLakeCommon {
7782
pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
7883
let table = match Self::get_table_url(&self.location)? {
@@ -281,26 +286,25 @@ impl Sink for DeltaLakeSink {
281286
const SINK_NAME: &'static str = DELTALAKE_SINK;
282287

283288
fn is_sink_decouple(desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
284-
let config_decouple = if let Some(interval) =
285-
desc.properties.get("commit_checkpoint_interval")
286-
&& interval.parse::<u64>().unwrap_or(0) > 1
287-
{
288-
true
289-
} else {
290-
false
291-
};
289+
let commit_checkpoint_interval =
290+
if let Some(interval) = desc.properties.get("commit_checkpoint_interval") {
291+
interval
292+
.parse::<u64>()
293+
.unwrap_or(DEFAULT_COMMIT_CHECKPOINT_INTERVAL)
294+
} else {
295+
DEFAULT_COMMIT_CHECKPOINT_INTERVAL
296+
};
292297

293298
match user_specified {
294-
SinkDecouple::Default => Ok(config_decouple),
299+
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
295300
SinkDecouple::Disable => {
296-
if config_decouple {
301+
if commit_checkpoint_interval > 1 {
297302
return Err(SinkError::Config(anyhow!(
298303
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
299304
)));
300305
}
301306
Ok(false)
302307
}
303-
SinkDecouple::Enable => Ok(true),
304308
}
305309
}
306310

@@ -328,7 +332,7 @@ impl Sink for DeltaLakeSink {
328332
.await?;
329333

330334
let commit_checkpoint_interval =
331-
NonZeroU64::new(self.config.common.commit_checkpoint_interval.unwrap_or(1)).expect(
335+
NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
332336
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
333337
);
334338

@@ -380,9 +384,9 @@ impl Sink for DeltaLakeSink {
380384
)));
381385
}
382386
}
383-
if self.config.common.commit_checkpoint_interval == Some(0) {
387+
if self.config.common.commit_checkpoint_interval == 0 {
384388
return Err(SinkError::Config(anyhow!(
385-
"commit_checkpoint_interval must be greater than 0"
389+
"`commit_checkpoint_interval` must be greater than 0"
386390
)));
387391
}
388392
Ok(())

src/connector/src/sink/google_pubsub.rs

-9
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ use google_cloud_pubsub::client::{Client, ClientConfig};
2929
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
3030
use risingwave_common::array::StreamChunk;
3131
use risingwave_common::catalog::Schema;
32-
use risingwave_common::session_config::sink_decouple::SinkDecouple;
3332
use serde_derive::Deserialize;
3433
use serde_with::serde_as;
3534
use tonic::Status;
3635
use with_options::WithOptions;
3736

38-
use super::catalog::desc::SinkDesc;
3937
use super::catalog::SinkFormatDesc;
4038
use super::formatter::SinkFormatterImpl;
4139
use super::log_store::DeliveryFutureManagerAddFuture;
@@ -114,13 +112,6 @@ impl Sink for GooglePubSubSink {
114112

115113
const SINK_NAME: &'static str = PUBSUB_SINK;
116114

117-
fn is_sink_decouple(_desc: &SinkDesc, user_specified: &SinkDecouple) -> Result<bool> {
118-
match user_specified {
119-
SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
120-
SinkDecouple::Disable => Ok(false),
121-
}
122-
}
123-
124115
async fn validate(&self) -> Result<()> {
125116
if !self.is_append_only {
126117
return Err(SinkError::GooglePubSub(anyhow!(

0 commit comments

Comments
 (0)