Skip to content

Commit

Permalink
[close #383] fix integration test case flow_control & `stop_downstr…
Browse files Browse the repository at this point in the history
…eam` (#392)

* collect pprof heap

Signed-off-by: Ping Yu <[email protected]>

* unlimit retry for pd connection

Signed-off-by: Ping Yu <[email protected]>

* reduce record size

Signed-off-by: Ping Yu <[email protected]>

* log level: info

Signed-off-by: Ping Yu <[email protected]>

* reduce data size; add grafana panel

Signed-off-by: Ping Yu <[email protected]>

* fix encoder size

Signed-off-by: Ping Yu <[email protected]>

* fix

Signed-off-by: Ping Yu <[email protected]>

* MQMessage pool

Signed-off-by: Ping Yu <[email protected]>

* fix release

Signed-off-by: Ping Yu <[email protected]>

* wip

Signed-off-by: Ping Yu <[email protected]>

* fix flaky ut

Signed-off-by: Ping Yu <[email protected]>

* logging

Signed-off-by: Ping Yu <[email protected]>

* fix ut

Signed-off-by: Ping Yu <[email protected]>

* adjust memory release parameter

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

* polish

Signed-off-by: Ping Yu <[email protected]>

---------

Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored Mar 4, 2024
1 parent 503fce8 commit 40357a1
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 82 deletions.
58 changes: 47 additions & 11 deletions cdc/cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package codec
import (
"context"
"encoding/binary"
"sync"
"time"

"github.com/pingcap/log"
Expand Down Expand Up @@ -63,6 +64,29 @@ type MQMessage struct {
entriesCount int // entries in one MQ Message
}

const (
MemoryReleaseThreshold = 100 * 1024 // 100KiB
MemoryReleaseFactor = 100
)

func resetBuffer(buf []byte) []byte {
length := len(buf)
capSize := cap(buf)
if capSize > MemoryReleaseThreshold && length > 0 && length*MemoryReleaseFactor < capSize {
return nil
}
return buf[:0]
}

func (m *MQMessage) Reset() {
m.Key = resetBuffer(m.Key)
m.Value = resetBuffer(m.Value)
m.Ts = 0
m.Type = model.MqMessageTypeUnknown
m.Protocol = config.ProtocolDefault
m.entriesCount = 0
}

// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client.
// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233
// for TiKV-CDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer.
Expand Down Expand Up @@ -99,31 +123,43 @@ func newResolvedMQMessage(proto config.Protocol, key, value []byte, ts uint64) *
return NewMQMessage(proto, key, value, ts, model.MqMessageTypeResolved)
}

var mqMsgPool = sync.Pool{
New: func() any {
return new(MQMessage)
},
}

// NewMQMessage should be used when creating a MQMessage struct.
// It copies the input byte slices to avoid any surprises in asynchronous MQ writes.
func NewMQMessage(proto config.Protocol, key []byte, value []byte, ts uint64, ty model.MqMessageType) *MQMessage {
ret := &MQMessage{
Key: nil,
Value: nil,
Ts: ts,
Type: ty,
Protocol: proto,
entriesCount: 0,
ret := mqMsgPool.Get().(*MQMessage)

// TODO: remove this check.
if len(ret.Key) > 0 || len(ret.Value) > 0 {
log.Panic("MQMessage is not reset", zap.String("key", string(ret.Key)), zap.String("value", string(ret.Value)))
}

ret.Ts = ts
ret.Type = ty
ret.Protocol = proto
ret.entriesCount = 0

if key != nil {
ret.Key = make([]byte, len(key))
copy(ret.Key, key)
ret.Key = append(ret.Key, key...)
}

if value != nil {
ret.Value = make([]byte, len(value))
copy(ret.Value, value)
ret.Value = append(ret.Value, value...)
}

return ret
}

func ReleaseMQMessage(m *MQMessage) {
m.Reset()
mqMsgPool.Put(m)
}

// EventBatchDecoder is an abstraction for events decoder
// this interface is only for testing now
type EventBatchDecoder interface {
Expand Down
33 changes: 27 additions & 6 deletions cdc/cdc/sink/codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ type JSONEventBatchEncoder struct {
valueBuf *bytes.Buffer // Deprecated: only used for MixedBuild for now
supportMixedBuild bool // TODO decouple this out

messageBuf []*MQMessage
curBatchSize int
messageBuf []*MQMessage
curBatchSize int
totalBatchBytes int // Note: The size of last message is not included

// configs
maxMessageBytes int
maxBatchSize int
Expand Down Expand Up @@ -226,6 +228,9 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, BatchVersion1)

if len(d.messageBuf) > 0 {
d.totalBatchBytes += d.messageBuf[len(d.messageBuf)-1].Length()
}
d.messageBuf = append(d.messageBuf, NewMQMessage(config.ProtocolOpen, versionHead, nil, 0, model.MqMessageTypeKv))
d.curBatchSize = 0
}
Expand All @@ -249,6 +254,8 @@ func (d *JSONEventBatchEncoder) AppendChangedEvent(e *model.RawKVEntry) (Encoder
}

// Build implements the EventBatchEncoder interface
// NOTE: when supportMixedBuild is enabled, must call Reset() after the returned `mqMessages` is used.
// It's not a good design. As supportMixedBuild is used in unit tests only, we don't fix it now.
func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
if d.supportMixedBuild {
if d.valueBuf.Len() == 0 {
Expand All @@ -260,7 +267,7 @@ func (d *JSONEventBatchEncoder) Build() (mqMessages []*MQMessage) {
}

ret := d.messageBuf
d.messageBuf = make([]*MQMessage, 0)
d.Reset()
return ret
}

Expand Down Expand Up @@ -307,13 +314,27 @@ func (d *JSONEventBatchEncoder) MixedBuild(withVersion bool) []byte {

// Size implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Size() int {
return d.keyBuf.Len() + d.valueBuf.Len()
if d.supportMixedBuild {
return d.keyBuf.Len() + d.valueBuf.Len()
}

lastMessageLength := 0
if len(d.messageBuf) > 0 {
lastMessageLength = d.messageBuf[len(d.messageBuf)-1].Length()
}
return d.totalBatchBytes + lastMessageLength
}

// Reset implements the EventBatchEncoder interface
func (d *JSONEventBatchEncoder) Reset() {
d.keyBuf.Reset()
d.valueBuf.Reset()
if d.supportMixedBuild {
d.keyBuf.Reset()
d.valueBuf.Reset()
} else {
d.messageBuf = make([]*MQMessage, 0)
d.curBatchSize = 0
d.totalBatchBytes = 0
}
}

// SetParams reads relevant parameters for Open Protocol
Expand Down
19 changes: 17 additions & 2 deletions cdc/cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (s *batchSuite) TestSetParams(c *check.C) {
func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
defer testleak.AfterTest(c)()
encoder := NewJSONEventBatchEncoder()
c.Check(encoder.Size(), check.Equals, 0)

// the size of `testEvent` is 75
testEvent := &model.RawKVEntry{
Expand All @@ -207,23 +208,35 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
ExpiredTs: 200,
}
eventSize := 75
// for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44.
overhead := 36 + 8

// for a single message, the overhead is 36(maximumRecordOverhead) + 8(versionHea) = 44, just can hold it.
a := strconv.Itoa(eventSize + 44)
// just can hold a single message.
a := strconv.Itoa(eventSize + overhead)
err := encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Check(err, check.IsNil)
r, err := encoder.AppendChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)
totalSize := eventSize + overhead
c.Check(encoder.Size(), check.Equals, totalSize)

r, err = encoder.AppendChangedEvent(testEvent)
c.Check(err, check.IsNil)
c.Check(r, check.Equals, EncoderNoOperation)
totalSize += eventSize + overhead
c.Check(encoder.Size(), check.Equals, totalSize)

a = strconv.Itoa(eventSize + 43)
err = encoder.SetParams(map[string]string{"max-message-bytes": a})
c.Assert(err, check.IsNil)
r, err = encoder.AppendChangedEvent(testEvent)
c.Check(err, check.NotNil)
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(encoder.Size(), check.Equals, totalSize)

// make sure each batch's `Length` not greater than `max-message-bytes`
// 256: each message can hold 2 events (75 * 2 + 36 + 8 = 194)
err = encoder.SetParams(map[string]string{"max-message-bytes": "256"})
c.Check(err, check.IsNil)

Expand All @@ -232,6 +245,8 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) {
c.Check(r, check.Equals, EncoderNoOperation)
c.Check(err, check.IsNil)
}
totalSize += (eventSize*2 + overhead) * 5000
c.Check(encoder.Size(), check.Equals, totalSize)

messages := encoder.Build()
for _, msg := range messages {
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ func (k *mqSink) runWorker(ctx context.Context, partition int32) error {
})

for _, msg := range messages {
thisBatchSize += msg.GetEntriesCount()
err := k.writeToProducer(ctx, msg, codec.EncoderNeedAsyncWrite, partition)
if err != nil {
return 0, err
}
thisBatchSize += msg.GetEntriesCount()
}

if op == codec.EncoderNeedSyncWrite {
Expand Down
24 changes: 19 additions & 5 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

type mqSinkSuite struct{}

var _ = check.Suite(&mqSinkSuite{})
var _ = check.SerialSuites(&mqSinkSuite{})

func (s mqSinkSuite) TestKafkaSink(c *check.C) {
defer testleak.AfterTest(c)()
Expand Down Expand Up @@ -144,6 +144,16 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand All @@ -152,8 +162,11 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
sink, err := newKafkaSaramaSink(ctx, sinkURI, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
// mock kafka broker processes 3 row changed events
for i := 0; i < 3; i++ {
leader.Returns(prodSuccess)
}

keyspanID1 := model.KeySpanID(1)
kv1 := &model.RawKVEntry{
OpType: model.OpTypePut,
Expand Down Expand Up @@ -182,12 +195,13 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {
StartTs: 110,
CRTs: 130,
}

err = sink.EmitChangedEvents(ctx, kv3)
c.Assert(err, check.IsNil)

// TODO: fix EmitCheckpointTs
// mock kafka broker processes 1 row resolvedTs event
leader.Returns(prodSuccess)
// leader.Returns(prodSuccess)

checkpointTs1, err := sink.FlushChangedEvents(ctx, keyspanID1, kv1.CRTs)
c.Assert(err, check.IsNil)
c.Assert(checkpointTs1, check.Equals, kv1.CRTs)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
sarama.MaxRequestSize = 100 * 1024 * 1024 // 100MB
}

// Config stores user specified Kafka producer configuration
Expand Down
10 changes: 5 additions & 5 deletions cdc/cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
ctx := context.Background()
config := NewConfig()
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
_, err := NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(cerror.ErrKafkaInvalidClientID.Equal(err), check.IsTrue)

config.ClientID = "test-kafka-client"
Expand All @@ -56,15 +56,15 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
}
for _, cc := range compressionCases {
config.Compression = cc.algorithm
cfg, err := newSaramaConfigImpl(ctx, config)
cfg, err := NewSaramaConfigImpl(ctx, config)
c.Assert(err, check.IsNil)
c.Assert(cfg.Producer.Compression, check.Equals, cc.expected)
}

config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
}
_, err = newSaramaConfigImpl(ctx, config)
_, err = NewSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory")

saslConfig := NewConfig()
Expand All @@ -76,7 +76,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
SaslMechanism: sarama.SASLTypeSCRAMSHA256,
}

cfg, err := newSaramaConfigImpl(ctx, saslConfig)
cfg, err := NewSaramaConfigImpl(ctx, saslConfig)
c.Assert(err, check.IsNil)
c.Assert(cfg, check.NotNil)
c.Assert(cfg.Net.SASL.User, check.Equals, "user")
Expand Down
Loading

0 comments on commit 40357a1

Please sign in to comment.