@@ -38,23 +38,26 @@ use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
38
38
use risingwave_pb:: connector_service:: sink_metadata:: SerializedMetadata ;
39
39
use risingwave_pb:: connector_service:: SinkMetadata ;
40
40
use serde_derive:: { Deserialize , Serialize } ;
41
- use serde_with:: serde_as;
41
+ use serde_with:: { serde_as, DisplayFromStr } ;
42
42
use with_options:: WithOptions ;
43
43
44
44
use super :: catalog:: desc:: SinkDesc ;
45
45
use super :: coordinate:: CoordinatedSinkWriter ;
46
- use super :: decouple_checkpoint_log_sink:: DecoupleCheckpointLogSinkerOf ;
46
+ use super :: decouple_checkpoint_log_sink:: {
47
+ default_commit_checkpoint_interval, DecoupleCheckpointLogSinkerOf ,
48
+ DEFAULT_COMMIT_CHECKPOINT_INTERVAL ,
49
+ } ;
47
50
use super :: writer:: SinkWriter ;
48
51
use super :: {
49
52
Result , Sink , SinkCommitCoordinator , SinkError , SinkParam , SinkWriterParam ,
50
53
SINK_TYPE_APPEND_ONLY , SINK_USER_FORCE_APPEND_ONLY_OPTION ,
51
54
} ;
52
- use crate :: deserialize_optional_u64_from_string;
53
55
54
56
pub const DELTALAKE_SINK : & str = "deltalake" ;
55
57
pub const DEFAULT_REGION : & str = "us-east-1" ;
56
58
pub const GCS_SERVICE_ACCOUNT : & str = "service_account_key" ;
57
59
60
+ #[ serde_as]
58
61
#[ derive( Deserialize , Serialize , Debug , Clone , WithOptions ) ]
59
62
pub struct DeltaLakeCommon {
60
63
#[ serde( rename = "s3.access.key" ) ]
@@ -69,10 +72,12 @@ pub struct DeltaLakeCommon {
69
72
pub s3_endpoint : Option < String > ,
70
73
#[ serde( rename = "gcs.service.account" ) ]
71
74
pub gcs_service_account : Option < String > ,
72
- /// Commit every n(>0) checkpoints, if n is not set, we will commit every checkpoint.
73
- #[ serde( default , deserialize_with = "deserialize_optional_u64_from_string" ) ]
74
- pub commit_checkpoint_interval : Option < u64 > ,
75
+ /// Commit every n(>0) checkpoints, defalut is 10.
76
+ #[ serde( default = "default_commit_checkpoint_interval" ) ]
77
+ #[ serde_as( as = "DisplayFromStr" ) ]
78
+ pub commit_checkpoint_interval : u64 ,
75
79
}
80
+
76
81
impl DeltaLakeCommon {
77
82
pub async fn create_deltalake_client ( & self ) -> Result < DeltaTable > {
78
83
let table = match Self :: get_table_url ( & self . location ) ? {
@@ -281,26 +286,25 @@ impl Sink for DeltaLakeSink {
281
286
const SINK_NAME : & ' static str = DELTALAKE_SINK ;
282
287
283
288
fn is_sink_decouple ( desc : & SinkDesc , user_specified : & SinkDecouple ) -> Result < bool > {
284
- let config_decouple = if let Some ( interval ) =
285
- desc. properties . get ( "commit_checkpoint_interval" )
286
- && interval . parse :: < u64 > ( ) . unwrap_or ( 0 ) > 1
287
- {
288
- true
289
- } else {
290
- false
291
- } ;
289
+ let commit_checkpoint_interval =
290
+ if let Some ( interval ) = desc. properties . get ( "commit_checkpoint_interval" ) {
291
+ interval
292
+ . parse :: < u64 > ( )
293
+ . unwrap_or ( DEFAULT_COMMIT_CHECKPOINT_INTERVAL )
294
+ } else {
295
+ DEFAULT_COMMIT_CHECKPOINT_INTERVAL
296
+ } ;
292
297
293
298
match user_specified {
294
- SinkDecouple :: Default => Ok ( config_decouple ) ,
299
+ SinkDecouple :: Default | SinkDecouple :: Enable => Ok ( true ) ,
295
300
SinkDecouple :: Disable => {
296
- if config_decouple {
301
+ if commit_checkpoint_interval > 1 {
297
302
return Err ( SinkError :: Config ( anyhow ! (
298
303
"config conflict: DeltaLake config `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
299
304
) ) ) ;
300
305
}
301
306
Ok ( false )
302
307
}
303
- SinkDecouple :: Enable => Ok ( true ) ,
304
308
}
305
309
}
306
310
@@ -328,7 +332,7 @@ impl Sink for DeltaLakeSink {
328
332
. await ?;
329
333
330
334
let commit_checkpoint_interval =
331
- NonZeroU64 :: new ( self . config . common . commit_checkpoint_interval . unwrap_or ( 1 ) ) . expect (
335
+ NonZeroU64 :: new ( self . config . common . commit_checkpoint_interval ) . expect (
332
336
"commit_checkpoint_interval should be greater than 0, and it should be checked in config validation" ,
333
337
) ;
334
338
@@ -380,9 +384,9 @@ impl Sink for DeltaLakeSink {
380
384
) ) ) ;
381
385
}
382
386
}
383
- if self . config . common . commit_checkpoint_interval == Some ( 0 ) {
387
+ if self . config . common . commit_checkpoint_interval == 0 {
384
388
return Err ( SinkError :: Config ( anyhow ! (
385
- "commit_checkpoint_interval must be greater than 0"
389
+ "` commit_checkpoint_interval` must be greater than 0"
386
390
) ) ) ;
387
391
}
388
392
Ok ( ( ) )
0 commit comments