Skip to content

Commit

Permalink
Merge branch 'main' into fix-it-http-api
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu authored Aug 25, 2023
2 parents e46a07a + 9b4dc22 commit ba225c0
Show file tree
Hide file tree
Showing 24 changed files with 535 additions and 24 deletions.
4 changes: 3 additions & 1 deletion cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ ifeq (${CDC_ENABLE_VENDOR}, 1)
GOVENDORFLAG := -mod=vendor
endif

BUILD_FLAG := -buildvcs=false

GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILD_DEBUG := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -gcflags "all=-N -l" $(GOVENDORFLAG)
GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath
Expand All @@ -43,7 +45,7 @@ FAILPOINT_DISABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev

RELEASE_VERSION ?=
ifeq ($(RELEASE_VERSION),)
RELEASE_VERSION := v1.1-master
RELEASE_VERSION := v1.2-master
release_version_regex := ^cdc-v[0-9]\..*$$
release_branch_regex := "^cdc-[0-9]\.[0-9].*$$|^HEAD$$|^.*/*tags/cdc-v[0-9]\.[0-9]\..*$$"
ifneq ($(shell git rev-parse --abbrev-ref HEAD | egrep $(release_branch_regex)),)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
resp.Body.Close()
return nil
}, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError))
c.Assert(strings.Contains(err.Error(), "remote error: tls: bad certificate"), check.IsTrue)
c.Assert(strings.Contains(err.Error(), "remote error: tls: "), check.IsTrue)

// test cli sends request with a cert will success
err = retry.Do(ctx, func() error {
Expand Down
4 changes: 3 additions & 1 deletion cdc/cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,11 @@ func (s *eventFeedSession) receiveFromStream(
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID)

eventFilter := util.EventFilterFromCtx(ctx)

// always create a new region worker, because `receiveFromStreamV2` is ensured
// to call exactly once from outter code logic
worker := newRegionWorker(s, addr)
worker := newRegionWorker(s, addr, eventFilter)

defer worker.evictAllRegions()

Expand Down
8 changes: 8 additions & 0 deletions cdc/cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ var (
Name: "pull_event_count",
Help: "event count received by this puller",
}, []string{"type", "capture", "changefeed"})
filterOutEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Subsystem: "kvclient",
Name: "filter_out_event_count",
Help: "event count filtered out by event filter",
}, []string{"type", "capture", "changefeed"})
sendEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Expand Down Expand Up @@ -111,6 +118,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(eventFeedGauge)
registry.MustRegister(pullEventCounter)
registry.MustRegister(sendEventCounter)
registry.MustRegister(filterOutEventCounter)

Check warning on line 121 in cdc/cdc/kv/metrics.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/metrics.go#L121

Added line #L121 was not covered by tests
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
Expand Down
32 changes: 26 additions & 6 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,20 @@ func (rsm *regionStateManager) delState(regionID uint64) {

type regionWorkerMetrics struct {
// kv events related metrics
metricReceivedEventSize prometheus.Observer
metricDroppedEventSize prometheus.Observer
metricReceivedEventSize prometheus.Observer
metricDroppedEventSize prometheus.Observer

metricPullEventInitializedCounter prometheus.Counter
metricPullEventPrewriteCounter prometheus.Counter
metricPullEventCommitCounter prometheus.Counter
metricPullEventCommittedCounter prometheus.Counter
metricPullEventRollbackCounter prometheus.Counter
metricSendEventResolvedCounter prometheus.Counter
metricSendEventCommitCounter prometheus.Counter
metricSendEventCommittedCounter prometheus.Counter

metricSendEventResolvedCounter prometheus.Counter
metricSendEventCommitCounter prometheus.Counter
metricSendEventCommittedCounter prometheus.Counter

metricFilterOutEventCommittedCounter prometheus.Counter

// TODO: add region runtime related metrics
}
Expand Down Expand Up @@ -156,9 +160,11 @@ type regionWorker struct {

enableOldValue bool
storeAddr string

eventFilter *util.KvFilter
}

func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
func newRegionWorker(s *eventFeedSession, addr string, eventFilter *util.KvFilter) *regionWorker {
cfg := config.GetGlobalServerConfig().KVClient
worker := &regionWorker{
session: s,
Expand All @@ -171,6 +177,7 @@ func newRegionWorker(s *eventFeedSession, addr string) *regionWorker {
enableOldValue: s.enableOldValue,
storeAddr: addr,
concurrent: cfg.WorkerConcurrent,
eventFilter: eventFilter,
}
return worker
}
Expand All @@ -190,6 +197,7 @@ func (w *regionWorker) initMetrics(ctx context.Context) {
metrics.metricSendEventResolvedCounter = sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID)
metrics.metricSendEventCommitCounter = sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID)
metrics.metricSendEventCommittedCounter = sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID)
metrics.metricFilterOutEventCommittedCounter = filterOutEventCounter.WithLabelValues("committed", captureAddr, changefeedID)

w.metrics = metrics
}
Expand Down Expand Up @@ -655,6 +663,18 @@ func (w *regionWorker) handleEventEntry(
}
case cdcpb.Event_COMMITTED:
w.metrics.metricPullEventCommittedCounter.Inc()

if w.eventFilter != nil {
matched, err := w.eventFilter.EventMatch(entry)

Check warning on line 668 in cdc/cdc/kv/region_worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/region_worker.go#L668

Added line #L668 was not covered by tests
// EventMatch will return error when fail to decode key.
// Pass such entry to be handled by following steps.
if err == nil && !matched {
w.metrics.metricFilterOutEventCommittedCounter.Inc()
log.Debug("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", hex.EncodeToString(entry.Key)))
continue

Check warning on line 674 in cdc/cdc/kv/region_worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/region_worker.go#L671-L674

Added lines #L671 - L674 were not covered by tests
}
}

revent, err := assembleRowEvent(regionID, entry, w.enableOldValue)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/cdc/processor/pipeline/keyspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func NewKeySpanPipeline(ctx cdcContext.Context,

sinkNode := newSinkNode(keyspanID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo))
p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo, replConfig.Filter))

Check warning on line 199 in cdc/cdc/processor/pipeline/keyspan.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/processor/pipeline/keyspan.go#L199

Added line #L199 was not covered by tests
p.AppendNode(ctx, "sorter", sorterNode)
p.AppendNode(ctx, "sink", sinkNode)

Expand Down
9 changes: 8 additions & 1 deletion cdc/cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ type pullerNode struct {
keyspanName string
keyspan regionspan.Span
replicaInfo *model.KeySpanReplicaInfo
eventFilter *util.KvFilter
cancel context.CancelFunc
wg *errgroup.Group
}

func newPullerNode(
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo,
keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo, filterConfig *util.KvFilterConfig,
) pipeline.Node {
keyspan := regionspan.Span{Start: replicaInfo.Start, End: replicaInfo.End}
var filter *util.KvFilter
if filterConfig != nil {
filter = util.CreateFilter(filterConfig)

Check warning on line 46 in cdc/cdc/processor/pipeline/puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/processor/pipeline/puller.go#L44-L46

Added lines #L44 - L46 were not covered by tests
}
return &pullerNode{
keyspanID: keyspanID,
keyspanName: keyspan.Name(),
keyspan: keyspan,
replicaInfo: replicaInfo,
eventFilter: filter,

Check warning on line 53 in cdc/cdc/processor/pipeline/puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/processor/pipeline/puller.go#L53

Added line #L53 was not covered by tests
}
}

Expand All @@ -63,6 +69,7 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr
ctxC = util.PutKeySpanInfoInCtx(ctxC, n.keyspanID, n.keyspanName)
ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID)
ctxC = util.PutEventFilterInCtx(ctxC, n.eventFilter)

Check warning on line 72 in cdc/cdc/processor/pipeline/puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/processor/pipeline/puller.go#L72

Added line #L72 was not covered by tests

plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.keyspans(), true)
Expand Down
7 changes: 7 additions & 0 deletions cdc/metrics/grafana/tikv-cdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4837,6 +4837,13 @@
"intervalFactor": 1,
"legendFormat": "{{instance}}-{{type}}",
"refId": "A"
},
{
"expr": "sum(rate(tikv_cdc_kvclient_filter_out_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (instance, type)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "filter-out-{{instance}}-{{type}}",
"refId": "B"
}
],
"thresholds": [],
Expand Down
11 changes: 10 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
check-gc-safe-point = true`
check-gc-safe-point = true
[filter]
key-prefix = "key\\x00"
key-pattern = "key\\x00pattern"
value-pattern = "value\\ffpattern"
`
err := os.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)

Expand All @@ -49,6 +54,10 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) {
cfg := config.GetDefaultReplicaConfig()
err = o.strictDecodeConfig("cdc", cfg)
c.Assert(err, check.IsNil)
c.Assert(cfg.CheckGCSafePoint, check.IsTrue)
c.Assert(cfg.Filter.KeyPrefix, check.Equals, `key\x00`)
c.Assert(cfg.Filter.KeyPattern, check.Equals, `key\x00pattern`)
c.Assert(cfg.Filter.ValuePattern, check.Equals, `value\ffpattern`)

path = filepath.Join(dir, "config1.toml")
content = `
Expand Down
6 changes: 5 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strings"

"github.com/tikv/migration/cdc/pkg/config"
"github.com/tikv/migration/cdc/pkg/etcd"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -158,8 +159,11 @@ func (o *updateChangefeedOptions) applyChanges(oldInfo *model.ChangeFeedInfo, cm
case "sink-uri":
newInfo.SinkURI = o.commonChangefeedOptions.sinkURI
case "config":
if newInfo.Config == nil {
newInfo.Config = &config.ReplicaConfig{}
}
cfg := newInfo.Config
if err = o.commonChangefeedOptions.strictDecodeConfig("TiCDC changefeed", cfg); err != nil {
if err = o.commonChangefeedOptions.strictDecodeConfig("TiKV-CDC changefeed", cfg); err != nil {
log.Error("decode config file error", zap.Error(err))
}
case "opts":
Expand Down
22 changes: 21 additions & 1 deletion cdc/pkg/cmd/cli/cli_changefeed_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package cli

import (
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -43,6 +44,26 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(newInfo.SinkURI, check.Equals, "mysql://root@downstream-tidb:4000")

// Test update config file
oldInfo = &model.ChangeFeedInfo{}
dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
content := `
[filter]
key-prefix = "key\\x00"
key-pattern = "key\\x00pattern"
value-pattern = "value\\ffpattern"
`
err = os.WriteFile(path, []byte(content), 0o644)
c.Assert(err, check.IsNil)
c.Assert(cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", path)}), check.IsNil)
newInfo, err = o.applyChanges(oldInfo, cmd)
c.Assert(err, check.IsNil)
filterCnf := newInfo.Config.Filter
c.Assert(filterCnf.KeyPrefix, check.Equals, `key\x00`)
c.Assert(filterCnf.KeyPattern, check.Equals, `key\x00pattern`)
c.Assert(filterCnf.ValuePattern, check.Equals, `value\ffpattern`)

// Test for cli command flags that should be ignored.
oldInfo = &model.ChangeFeedInfo{SortDir: "."}
c.Assert(cmd.ParseFlags([]string{"--interact"}), check.IsNil)
Expand All @@ -63,7 +84,6 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) {
c.Assert(newInfo.EndKey, check.Equals, "")
c.Assert(newInfo.Format, check.Equals, "hex")

dir := c.MkDir()
filename := filepath.Join(dir, "log.txt")
reset, err := initTestLogger(filename)
defer reset()
Expand Down
10 changes: 10 additions & 0 deletions cdc/pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`

Expand All @@ -151,6 +156,11 @@ const (
"scheduler": {
"type": "keyspan-number",
"polling-time": -1
},
"filter": {
"key-prefix": "prefix",
"key-pattern": "key\\x00pattern",
"value-pattern": "value\\ffpattern"
}
}`
)
16 changes: 12 additions & 4 deletions cdc/pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/migration/cdc/pkg/config/outdated"
"github.com/tikv/migration/cdc/pkg/util"

"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand All @@ -39,10 +40,11 @@ var defaultReplicaConfig = &ReplicaConfig{
type ReplicaConfig replicaConfig

type replicaConfig struct {
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"`
CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"`
Sink *SinkConfig `toml:"sink" json:"sink"`
Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"`
Filter *util.KvFilterConfig `toml:"filter" json:"filter"`
}

// Marshal returns the json marshal format of a ReplicationConfig
Expand Down Expand Up @@ -113,6 +115,12 @@ func (c *ReplicaConfig) Validate() error {
return err
}
}
if c.Filter != nil {
err := c.Filter.Validate()
if err != nil {
return err

Check warning on line 121 in cdc/pkg/config/replica_config.go

View check run for this annotation

Codecov / codecov/patch

cdc/pkg/config/replica_config.go#L119-L121

Added lines #L119 - L121 were not covered by tests
}
}
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions cdc/pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/migration/cdc/pkg/util"
)

func mustIdentJSON(t *testing.T, j string) string {
Expand All @@ -38,6 +39,11 @@ func TestReplicaConfigMarshal(t *testing.T) {
Columns: []string{"a", "b"},
},
}
conf.Filter = &util.KvFilterConfig{
KeyPrefix: `prefix`,
KeyPattern: `key\x00pattern`,
ValuePattern: `value\ffpattern`,
}
b, err := conf.Marshal()
require.Nil(t, err)
require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b))
Expand Down
13 changes: 13 additions & 0 deletions cdc/pkg/util/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
ctxKeyIsOwner = ctxKey("isOwner")
ctxKeyTimezone = ctxKey("timezone")
ctxKeyKVStorage = ctxKey("kvStorage")
ctxEventFilter = ctxKey("eventFilter")
)

// CaptureAddrFromCtx returns a capture ID stored in the specified context.
Expand Down Expand Up @@ -121,6 +122,18 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont
return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID)
}

func EventFilterFromCtx(ctx context.Context) *KvFilter {
filter, ok := ctx.Value(ctxEventFilter).(*KvFilter)
if !ok {
return nil

Check warning on line 128 in cdc/pkg/util/ctx.go

View check run for this annotation

Codecov / codecov/patch

cdc/pkg/util/ctx.go#L125-L128

Added lines #L125 - L128 were not covered by tests
}
return filter

Check warning on line 130 in cdc/pkg/util/ctx.go

View check run for this annotation

Codecov / codecov/patch

cdc/pkg/util/ctx.go#L130

Added line #L130 was not covered by tests
}

func PutEventFilterInCtx(ctx context.Context, filter *KvFilter) context.Context {
return context.WithValue(ctx, ctxEventFilter, filter)

Check warning on line 134 in cdc/pkg/util/ctx.go

View check run for this annotation

Codecov / codecov/patch

cdc/pkg/util/ctx.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}

// ZapFieldCapture returns a zap field containing capture address
// TODO: log redact for capture address
func ZapFieldCapture(ctx context.Context) zap.Field {
Expand Down
Loading

0 comments on commit ba225c0

Please sign in to comment.