@@ -18,6 +18,7 @@ import (
18
18
"flag"
19
19
"fmt"
20
20
"math"
21
+ "math/rand"
21
22
"net/url"
22
23
"os"
23
24
"os/signal"
@@ -44,7 +45,7 @@ import (
44
45
)
45
46
46
47
const (
47
- downstreamRetryInterval = 500 * time . Millisecond
48
+ downstreamRetryIntervalMs int = 200
48
49
)
49
50
50
51
// Sarama configuration options
@@ -379,15 +380,45 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
379
380
if sink == nil {
380
381
panic ("sink should initialized" )
381
382
}
382
- ClaimMessages:
383
+ kvs := make ([] * model. RawKVEntry , 0 )
383
384
for message := range claim .Messages () {
384
385
log .Debug ("Message claimed" , zap .Int32 ("partition" , message .Partition ), zap .ByteString ("key" , message .Key ), zap .ByteString ("value" , message .Value ))
385
386
batchDecoder , err := codec .NewJSONEventBatchDecoder (message .Key , message .Value )
386
387
if err != nil {
387
388
return errors .Trace (err )
388
389
}
389
390
391
+ // Return error only when the session is closed
392
+ emitChangedEvents := func () error {
393
+ if len (kvs ) == 0 {
394
+ return nil
395
+ }
396
+ for {
397
+ err = sink .EmitChangedEvents (ctx , kvs ... )
398
+ if err == nil {
399
+ log .Debug ("emit changed events" , zap .Any ("kvs" , kvs ))
400
+ lastCRTs := sink .lastCRTs .Load ()
401
+ lastKv := kvs [len (kvs )- 1 ]
402
+ if lastCRTs < lastKv .CRTs {
403
+ sink .lastCRTs .Store (lastKv .CRTs )
404
+ }
405
+ kvs = kvs [:0 ]
406
+ return nil
407
+ }
408
+
409
+ log .Warn ("emit row changed event failed" , zap .Error (err ))
410
+ if session .Context ().Err () != nil {
411
+ log .Warn ("session closed" , zap .Error (session .Context ().Err ()))
412
+ return session .Context ().Err ()
413
+ }
414
+
415
+ sleepMs := downstreamRetryIntervalMs + rand .Intn (downstreamRetryIntervalMs )
416
+ time .Sleep (time .Duration (sleepMs ) * time .Millisecond )
417
+ }
418
+ }
419
+
390
420
counter := 0
421
+ KvLoop:
391
422
for {
392
423
tp , hasNext , err := batchDecoder .HasNext ()
393
424
if err != nil {
@@ -416,32 +447,21 @@ ClaimMessages:
416
447
zap .Uint64 ("globalResolvedTs" , globalResolvedTs ),
417
448
zap .Uint64 ("sinkResolvedTs" , sink .resolvedTs .Load ()),
418
449
zap .Int32 ("partition" , partition ))
419
- break ClaimMessages
450
+ continue KvLoop
420
451
}
421
452
422
- for {
423
- err = sink .EmitChangedEvents (ctx , kv )
424
- if err == nil {
425
- log .Debug ("emit changed events" , zap .Any ("kv" , kv ))
426
- lastCRTs := sink .lastCRTs .Load ()
427
- if lastCRTs < kv .CRTs {
428
- sink .lastCRTs .Store (kv .CRTs )
429
- }
430
- break
431
- }
432
-
433
- log .Warn ("emit row changed event failed" , zap .Error (err ))
434
- if session .Context ().Err () != nil {
435
- log .Warn ("session closed" , zap .Error (session .Context ().Err ()))
436
- return nil
437
- }
438
- time .Sleep (downstreamRetryInterval )
439
- }
453
+ kvs = append (kvs , kv )
440
454
case model .MqMessageTypeResolved :
441
455
ts , err := batchDecoder .NextResolvedEvent ()
442
456
if err != nil {
443
457
log .Fatal ("decode message value failed" , zap .ByteString ("value" , message .Value ))
444
458
}
459
+
460
+ if err := emitChangedEvents (); err != nil {
461
+ log .Info ("session closed" , zap .Error (err ))
462
+ return nil
463
+ }
464
+
445
465
resolvedTs := sink .resolvedTs .Load ()
446
466
if resolvedTs < ts {
447
467
log .Debug ("update sink resolved ts" ,
@@ -450,13 +470,19 @@ ClaimMessages:
450
470
sink .resolvedTs .Store (ts )
451
471
}
452
472
}
453
- session .MarkMessage (message , "" )
454
473
}
455
474
456
475
if counter > kafkaMaxBatchSize {
457
476
log .Fatal ("Open Protocol max-batch-size exceeded" , zap .Int ("max-batch-size" , kafkaMaxBatchSize ),
458
477
zap .Int ("actual-batch-size" , counter ))
459
478
}
479
+
480
+ if err := emitChangedEvents (); err != nil {
481
+ log .Info ("session closed" , zap .Error (err ))
482
+ return nil
483
+ }
484
+
485
+ session .MarkMessage (message , "" )
460
486
}
461
487
462
488
return nil
0 commit comments