From 9b4dc22d140cb80be5ade48dc83dd6f4b7c7271d Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 25 Aug 2023 13:28:37 +0800 Subject: [PATCH] [close #348] support filter for key & value (#349) * support filter for key & value Signed-off-by: Ping Yu * add tests Signed-off-by: Ping Yu * polish ut Signed-off-by: Ping Yu * rename KvFilter Signed-off-by: Ping Yu * add it Signed-off-by: Ping Yu * polish Signed-off-by: Ping Yu * fix ut Signed-off-by: Ping Yu * fix metrics Signed-off-by: Ping Yu * fix CI error Signed-off-by: Ping Yu * decode user key for filter Signed-off-by: Ping Yu --------- Signed-off-by: Ping Yu --- cdc/Makefile | 4 +- cdc/cdc/http_status_test.go | 2 +- cdc/cdc/kv/client.go | 4 +- cdc/cdc/kv/metrics.go | 8 ++ cdc/cdc/kv/region_worker.go | 32 ++++- cdc/cdc/processor/pipeline/keyspan.go | 2 +- cdc/cdc/processor/pipeline/puller.go | 9 +- cdc/metrics/grafana/tikv-cdc.json | 7 + cdc/pkg/cmd/cli/cli_changefeed_create_test.go | 11 +- cdc/pkg/cmd/cli/cli_changefeed_update.go | 6 +- cdc/pkg/cmd/cli/cli_changefeed_update_test.go | 22 ++- cdc/pkg/config/config_test_data.go | 10 ++ cdc/pkg/config/replica_config.go | 16 ++- cdc/pkg/config/replica_config_test.go | 6 + cdc/pkg/util/ctx.go | 13 ++ cdc/pkg/util/kv_filter.go | 121 ++++++++++++++++ cdc/pkg/util/kv_filter_test.go | 132 ++++++++++++++++++ .../integration_tests/_utils/check_total_kvs | 47 +++++++ .../kv_filter/conf/changefeed.toml | 3 + cdc/tests/integration_tests/kv_filter/run.sh | 49 +++++++ cdc/tests/integration_tests/run_group.sh | 2 +- cdc/tests/utils/rawkv_data/checksum.go | 42 ++++++ cdc/tests/utils/rawkv_data/gen_data.go | 10 +- cdc/tests/utils/rawkv_data/main.go | 1 + 24 files changed, 535 insertions(+), 24 deletions(-) create mode 100644 cdc/pkg/util/kv_filter.go create mode 100644 cdc/pkg/util/kv_filter_test.go create mode 100755 cdc/tests/integration_tests/_utils/check_total_kvs create mode 100644 cdc/tests/integration_tests/kv_filter/conf/changefeed.toml create mode 100644 cdc/tests/integration_tests/kv_filter/run.sh diff --git a/cdc/Makefile b/cdc/Makefile index e152f0bf..bd606032 100644 --- a/cdc/Makefile +++ b/cdc/Makefile @@ -19,6 +19,8 @@ ifeq (${CDC_ENABLE_VENDOR}, 1) GOVENDORFLAG := -mod=vendor endif +BUILD_FLAG := -buildvcs=false + GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG) GOBUILD_DEBUG := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -gcflags "all=-N -l" $(GOVENDORFLAG) GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath @@ -43,7 +45,7 @@ FAILPOINT_DISABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev RELEASE_VERSION ?= ifeq ($(RELEASE_VERSION),) - RELEASE_VERSION := v1.1-master + RELEASE_VERSION := v1.2-master release_version_regex := ^cdc-v[0-9]\..*$$ release_branch_regex := "^cdc-[0-9]\.[0-9].*$$|^HEAD$$|^.*/*tags/cdc-v[0-9]\.[0-9]\..*$$" ifneq ($(shell git rev-parse --abbrev-ref HEAD | egrep $(release_branch_regex)),) diff --git a/cdc/cdc/http_status_test.go b/cdc/cdc/http_status_test.go index 762391e6..b6a3b346 100644 --- a/cdc/cdc/http_status_test.go +++ b/cdc/cdc/http_status_test.go @@ -316,7 +316,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) { resp.Body.Close() return nil }, retry.WithMaxTries(retryTime), retry.WithBackoffBaseDelay(50), retry.WithIsRetryableErr(cerrors.IsRetryableError)) - c.Assert(strings.Contains(err.Error(), "remote error: tls: bad certificate"), check.IsTrue) + c.Assert(strings.Contains(err.Error(), "remote error: tls: "), check.IsTrue) // test cli sends request with a cert will success err = retry.Do(ctx, func() error { diff --git a/cdc/cdc/kv/client.go b/cdc/cdc/kv/client.go index e3b5cb2d..88d0984a 100644 --- a/cdc/cdc/kv/client.go +++ b/cdc/cdc/kv/client.go @@ -1110,9 +1110,11 @@ func (s *eventFeedSession) receiveFromStream( changefeedID := util.ChangefeedIDFromCtx(ctx) metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(captureAddr, changefeedID) + eventFilter := util.EventFilterFromCtx(ctx) + // always create a new region worker, because `receiveFromStreamV2` is ensured // to call exactly once from outter code logic - worker := newRegionWorker(s, addr) + worker := newRegionWorker(s, addr, eventFilter) defer worker.evictAllRegions() diff --git a/cdc/cdc/kv/metrics.go b/cdc/cdc/kv/metrics.go index c76908d8..a8401a71 100644 --- a/cdc/cdc/kv/metrics.go +++ b/cdc/cdc/kv/metrics.go @@ -58,6 +58,13 @@ var ( Name: "pull_event_count", Help: "event count received by this puller", }, []string{"type", "capture", "changefeed"}) + filterOutEventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tikv_cdc", + Subsystem: "kvclient", + Name: "filter_out_event_count", + Help: "event count filtered out by event filter", + }, []string{"type", "capture", "changefeed"}) sendEventCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tikv_cdc", @@ -111,6 +118,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(eventFeedGauge) registry.MustRegister(pullEventCounter) registry.MustRegister(sendEventCounter) + registry.MustRegister(filterOutEventCounter) registry.MustRegister(clientChannelSize) registry.MustRegister(clientRegionTokenSize) registry.MustRegister(cachedRegionSize) diff --git a/cdc/cdc/kv/region_worker.go b/cdc/cdc/kv/region_worker.go index dc2f6df9..34237916 100644 --- a/cdc/cdc/kv/region_worker.go +++ b/cdc/cdc/kv/region_worker.go @@ -108,16 +108,20 @@ func (rsm *regionStateManager) delState(regionID uint64) { type regionWorkerMetrics struct { // kv events related metrics - metricReceivedEventSize prometheus.Observer - metricDroppedEventSize prometheus.Observer + metricReceivedEventSize prometheus.Observer + metricDroppedEventSize prometheus.Observer + metricPullEventInitializedCounter prometheus.Counter metricPullEventPrewriteCounter prometheus.Counter metricPullEventCommitCounter prometheus.Counter metricPullEventCommittedCounter prometheus.Counter metricPullEventRollbackCounter prometheus.Counter - metricSendEventResolvedCounter prometheus.Counter - metricSendEventCommitCounter prometheus.Counter - metricSendEventCommittedCounter prometheus.Counter + + metricSendEventResolvedCounter prometheus.Counter + metricSendEventCommitCounter prometheus.Counter + metricSendEventCommittedCounter prometheus.Counter + + metricFilterOutEventCommittedCounter prometheus.Counter // TODO: add region runtime related metrics } @@ -156,9 +160,11 @@ type regionWorker struct { enableOldValue bool storeAddr string + + eventFilter *util.KvFilter } -func newRegionWorker(s *eventFeedSession, addr string) *regionWorker { +func newRegionWorker(s *eventFeedSession, addr string, eventFilter *util.KvFilter) *regionWorker { cfg := config.GetGlobalServerConfig().KVClient worker := ®ionWorker{ session: s, @@ -171,6 +177,7 @@ func newRegionWorker(s *eventFeedSession, addr string) *regionWorker { enableOldValue: s.enableOldValue, storeAddr: addr, concurrent: cfg.WorkerConcurrent, + eventFilter: eventFilter, } return worker } @@ -190,6 +197,7 @@ func (w *regionWorker) initMetrics(ctx context.Context) { metrics.metricSendEventResolvedCounter = sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID) metrics.metricSendEventCommitCounter = sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID) metrics.metricSendEventCommittedCounter = sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID) + metrics.metricFilterOutEventCommittedCounter = filterOutEventCounter.WithLabelValues("committed", captureAddr, changefeedID) w.metrics = metrics } @@ -655,6 +663,18 @@ func (w *regionWorker) handleEventEntry( } case cdcpb.Event_COMMITTED: w.metrics.metricPullEventCommittedCounter.Inc() + + if w.eventFilter != nil { + matched, err := w.eventFilter.EventMatch(entry) + // EventMatch will return error when fail to decode key. + // Pass such entry to be handled by following steps. + if err == nil && !matched { + w.metrics.metricFilterOutEventCommittedCounter.Inc() + log.Debug("handleEventEntry: event is filter out and drop", zap.String("OpType", entry.OpType.String()), zap.String("key", hex.EncodeToString(entry.Key))) + continue + } + } + revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) if err != nil { return errors.Trace(err) diff --git a/cdc/cdc/processor/pipeline/keyspan.go b/cdc/cdc/processor/pipeline/keyspan.go index bb4beedf..e22b890e 100644 --- a/cdc/cdc/processor/pipeline/keyspan.go +++ b/cdc/cdc/processor/pipeline/keyspan.go @@ -196,7 +196,7 @@ func NewKeySpanPipeline(ctx cdcContext.Context, sinkNode := newSinkNode(keyspanID, sink, replicaInfo.StartTs, targetTs, flowController) - p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo)) + p.AppendNode(ctx, "puller", newPullerNode(keyspanID, replicaInfo, replConfig.Filter)) p.AppendNode(ctx, "sorter", sorterNode) p.AppendNode(ctx, "sink", sinkNode) diff --git a/cdc/cdc/processor/pipeline/puller.go b/cdc/cdc/processor/pipeline/puller.go index 07f4fc2b..11647041 100644 --- a/cdc/cdc/processor/pipeline/puller.go +++ b/cdc/cdc/processor/pipeline/puller.go @@ -32,19 +32,25 @@ type pullerNode struct { keyspanName string keyspan regionspan.Span replicaInfo *model.KeySpanReplicaInfo + eventFilter *util.KvFilter cancel context.CancelFunc wg *errgroup.Group } func newPullerNode( - keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo, + keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo, filterConfig *util.KvFilterConfig, ) pipeline.Node { keyspan := regionspan.Span{Start: replicaInfo.Start, End: replicaInfo.End} + var filter *util.KvFilter + if filterConfig != nil { + filter = util.CreateFilter(filterConfig) + } return &pullerNode{ keyspanID: keyspanID, keyspanName: keyspan.Name(), keyspan: keyspan, replicaInfo: replicaInfo, + eventFilter: filter, } } @@ -63,6 +69,7 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr ctxC = util.PutKeySpanInfoInCtx(ctxC, n.keyspanID, n.keyspanName) ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr) ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID) + ctxC = util.PutEventFilterInCtx(ctxC, n.eventFilter) plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, ctx.GlobalVars().GrpcPool, ctx.GlobalVars().RegionCache, ctx.GlobalVars().KVStorage, n.replicaInfo.StartTs, n.keyspans(), true) diff --git a/cdc/metrics/grafana/tikv-cdc.json b/cdc/metrics/grafana/tikv-cdc.json index 0bf84f7b..6ff82cc0 100644 --- a/cdc/metrics/grafana/tikv-cdc.json +++ b/cdc/metrics/grafana/tikv-cdc.json @@ -4837,6 +4837,13 @@ "intervalFactor": 1, "legendFormat": "{{instance}}-{{type}}", "refId": "A" + }, + { + "expr": "sum(rate(tikv_cdc_kvclient_filter_out_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\"}[1m])) by (instance, type)", + "format": "time_series", + "intervalFactor": 1, + "legendFormat": "filter-out-{{instance}}-{{type}}", + "refId": "B" } ], "thresholds": [], diff --git a/cdc/pkg/cmd/cli/cli_changefeed_create_test.go b/cdc/pkg/cmd/cli/cli_changefeed_create_test.go index b8b4a62e..0aad9bf5 100644 --- a/cdc/pkg/cmd/cli/cli_changefeed_create_test.go +++ b/cdc/pkg/cmd/cli/cli_changefeed_create_test.go @@ -40,7 +40,12 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) { dir := c.MkDir() path := filepath.Join(dir, "config.toml") content := ` - check-gc-safe-point = true` + check-gc-safe-point = true + [filter] + key-prefix = "key\\x00" + key-pattern = "key\\x00pattern" + value-pattern = "value\\ffpattern" + ` err := os.WriteFile(path, []byte(content), 0o644) c.Assert(err, check.IsNil) @@ -49,6 +54,10 @@ func (s *changefeedSuite) TestStrictDecodeConfig(c *check.C) { cfg := config.GetDefaultReplicaConfig() err = o.strictDecodeConfig("cdc", cfg) c.Assert(err, check.IsNil) + c.Assert(cfg.CheckGCSafePoint, check.IsTrue) + c.Assert(cfg.Filter.KeyPrefix, check.Equals, `key\x00`) + c.Assert(cfg.Filter.KeyPattern, check.Equals, `key\x00pattern`) + c.Assert(cfg.Filter.ValuePattern, check.Equals, `value\ffpattern`) path = filepath.Join(dir, "config1.toml") content = ` diff --git a/cdc/pkg/cmd/cli/cli_changefeed_update.go b/cdc/pkg/cmd/cli/cli_changefeed_update.go index 607b41a3..b78a6645 100644 --- a/cdc/pkg/cmd/cli/cli_changefeed_update.go +++ b/cdc/pkg/cmd/cli/cli_changefeed_update.go @@ -17,6 +17,7 @@ import ( "fmt" "strings" + "github.com/tikv/migration/cdc/pkg/config" "github.com/tikv/migration/cdc/pkg/etcd" "github.com/pingcap/errors" @@ -158,8 +159,11 @@ func (o *updateChangefeedOptions) applyChanges(oldInfo *model.ChangeFeedInfo, cm case "sink-uri": newInfo.SinkURI = o.commonChangefeedOptions.sinkURI case "config": + if newInfo.Config == nil { + newInfo.Config = &config.ReplicaConfig{} + } cfg := newInfo.Config - if err = o.commonChangefeedOptions.strictDecodeConfig("TiCDC changefeed", cfg); err != nil { + if err = o.commonChangefeedOptions.strictDecodeConfig("TiKV-CDC changefeed", cfg); err != nil { log.Error("decode config file error", zap.Error(err)) } case "opts": diff --git a/cdc/pkg/cmd/cli/cli_changefeed_update_test.go b/cdc/pkg/cmd/cli/cli_changefeed_update_test.go index a24a4ef0..389ecd13 100644 --- a/cdc/pkg/cmd/cli/cli_changefeed_update_test.go +++ b/cdc/pkg/cmd/cli/cli_changefeed_update_test.go @@ -14,6 +14,7 @@ package cli import ( + "fmt" "os" "path/filepath" "strings" @@ -43,6 +44,26 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) { c.Assert(err, check.IsNil) c.Assert(newInfo.SinkURI, check.Equals, "mysql://root@downstream-tidb:4000") + // Test update config file + oldInfo = &model.ChangeFeedInfo{} + dir := c.MkDir() + path := filepath.Join(dir, "config.toml") + content := ` + [filter] + key-prefix = "key\\x00" + key-pattern = "key\\x00pattern" + value-pattern = "value\\ffpattern" + ` + err = os.WriteFile(path, []byte(content), 0o644) + c.Assert(err, check.IsNil) + c.Assert(cmd.ParseFlags([]string{fmt.Sprintf("--config=%s", path)}), check.IsNil) + newInfo, err = o.applyChanges(oldInfo, cmd) + c.Assert(err, check.IsNil) + filterCnf := newInfo.Config.Filter + c.Assert(filterCnf.KeyPrefix, check.Equals, `key\x00`) + c.Assert(filterCnf.KeyPattern, check.Equals, `key\x00pattern`) + c.Assert(filterCnf.ValuePattern, check.Equals, `value\ffpattern`) + // Test for cli command flags that should be ignored. oldInfo = &model.ChangeFeedInfo{SortDir: "."} c.Assert(cmd.ParseFlags([]string{"--interact"}), check.IsNil) @@ -63,7 +84,6 @@ func (s *changefeedUpdateSuite) TestApplyChanges(c *check.C) { c.Assert(newInfo.EndKey, check.Equals, "") c.Assert(newInfo.Format, check.Equals, "hex") - dir := c.MkDir() filename := filepath.Join(dir, "log.txt") reset, err := initTestLogger(filename) defer reset() diff --git a/cdc/pkg/config/config_test_data.go b/cdc/pkg/config/config_test_data.go index d3491ce9..916f0ee8 100644 --- a/cdc/pkg/config/config_test_data.go +++ b/cdc/pkg/config/config_test_data.go @@ -127,6 +127,11 @@ const ( "scheduler": { "type": "keyspan-number", "polling-time": -1 + }, + "filter": { + "key-prefix": "prefix", + "key-pattern": "key\\x00pattern", + "value-pattern": "value\\ffpattern" } }` @@ -151,6 +156,11 @@ const ( "scheduler": { "type": "keyspan-number", "polling-time": -1 + }, + "filter": { + "key-prefix": "prefix", + "key-pattern": "key\\x00pattern", + "value-pattern": "value\\ffpattern" } }` ) diff --git a/cdc/pkg/config/replica_config.go b/cdc/pkg/config/replica_config.go index 1460d48d..dec0cc9b 100644 --- a/cdc/pkg/config/replica_config.go +++ b/cdc/pkg/config/replica_config.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/tikv/migration/cdc/pkg/config/outdated" + "github.com/tikv/migration/cdc/pkg/util" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -39,10 +40,11 @@ var defaultReplicaConfig = &ReplicaConfig{ type ReplicaConfig replicaConfig type replicaConfig struct { - EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` - CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` - Sink *SinkConfig `toml:"sink" json:"sink"` - Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + EnableOldValue bool `toml:"enable-old-value" json:"enable-old-value"` + CheckGCSafePoint bool `toml:"check-gc-safe-point" json:"check-gc-safe-point"` + Sink *SinkConfig `toml:"sink" json:"sink"` + Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` + Filter *util.KvFilterConfig `toml:"filter" json:"filter"` } // Marshal returns the json marshal format of a ReplicationConfig @@ -113,6 +115,12 @@ func (c *ReplicaConfig) Validate() error { return err } } + if c.Filter != nil { + err := c.Filter.Validate() + if err != nil { + return err + } + } return nil } diff --git a/cdc/pkg/config/replica_config_test.go b/cdc/pkg/config/replica_config_test.go index 4fb12c86..93da46a7 100644 --- a/cdc/pkg/config/replica_config_test.go +++ b/cdc/pkg/config/replica_config_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/tikv/migration/cdc/pkg/util" ) func mustIdentJSON(t *testing.T, j string) string { @@ -38,6 +39,11 @@ func TestReplicaConfigMarshal(t *testing.T) { Columns: []string{"a", "b"}, }, } + conf.Filter = &util.KvFilterConfig{ + KeyPrefix: `prefix`, + KeyPattern: `key\x00pattern`, + ValuePattern: `value\ffpattern`, + } b, err := conf.Marshal() require.Nil(t, err) require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b)) diff --git a/cdc/pkg/util/ctx.go b/cdc/pkg/util/ctx.go index 027dfedc..abae5825 100644 --- a/cdc/pkg/util/ctx.go +++ b/cdc/pkg/util/ctx.go @@ -31,6 +31,7 @@ const ( ctxKeyIsOwner = ctxKey("isOwner") ctxKeyTimezone = ctxKey("timezone") ctxKeyKVStorage = ctxKey("kvStorage") + ctxEventFilter = ctxKey("eventFilter") ) // CaptureAddrFromCtx returns a capture ID stored in the specified context. @@ -121,6 +122,18 @@ func PutChangefeedIDInCtx(ctx context.Context, changefeedID string) context.Cont return context.WithValue(ctx, ctxKeyChangefeedID, changefeedID) } +func EventFilterFromCtx(ctx context.Context) *KvFilter { + filter, ok := ctx.Value(ctxEventFilter).(*KvFilter) + if !ok { + return nil + } + return filter +} + +func PutEventFilterInCtx(ctx context.Context, filter *KvFilter) context.Context { + return context.WithValue(ctx, ctxEventFilter, filter) +} + // ZapFieldCapture returns a zap field containing capture address // TODO: log redact for capture address func ZapFieldCapture(ctx context.Context) zap.Field { diff --git a/cdc/pkg/util/kv_filter.go b/cdc/pkg/util/kv_filter.go new file mode 100644 index 00000000..f3cd4835 --- /dev/null +++ b/cdc/pkg/util/kv_filter.go @@ -0,0 +1,121 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "bytes" + "encoding/hex" + "fmt" + "io" + "regexp" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/pingcap/log" + "go.uber.org/zap" + "golang.org/x/net/html/charset" +) + +type KvFilterConfig struct { + // Binary data is specified in escaped format, e.g. \x00\x01 + KeyPrefix string `toml:"key-prefix" json:"key-prefix"` + KeyPattern string `toml:"key-pattern" json:"key-pattern"` + ValuePattern string `toml:"value-pattern" json:"value-pattern"` +} + +func (c *KvFilterConfig) Validate() error { + if c.KeyPrefix != "" { + if _, err := ParseKey("escaped", c.KeyPrefix); err != nil { + return fmt.Errorf("invalid key-prefix: %s", err.Error()) + } + } + if c.KeyPattern != "" { + if _, err := regexp.Compile(c.KeyPattern); err != nil { + return fmt.Errorf("invalid key-pattern: %s", err.Error()) + } + } + + if c.ValuePattern != "" { + if _, err := regexp.Compile(c.ValuePattern); err != nil { + return fmt.Errorf("invalid value-pattern: %s", err.Error()) + } + } + + return nil +} + +type KvFilter struct { + keyPrefix []byte + keyPattern *regexp.Regexp + valuePattern *regexp.Regexp +} + +func CreateFilter(conf *KvFilterConfig) *KvFilter { + var ( + keyPrefix []byte + keyPattern *regexp.Regexp + valuePattern *regexp.Regexp + ) + + if conf.KeyPrefix != "" { + keyPrefix, _ = ParseKey("escaped", conf.KeyPrefix) + } + if conf.KeyPattern != "" { + keyPattern = regexp.MustCompile(conf.KeyPattern) + } + if conf.ValuePattern != "" { + valuePattern = regexp.MustCompile(conf.ValuePattern) + } + + return &KvFilter{ + keyPrefix: keyPrefix, + keyPattern: keyPattern, + valuePattern: valuePattern, + } +} + +// Key of entry is expected to be in RawKV APIv2 format. +// Return error if not. +func (f *KvFilter) EventMatch(entry *cdcpb.Event_Row) (bool, error) { + // Filter on put & delete only. + if entry.GetOpType() != cdcpb.Event_Row_DELETE && entry.GetOpType() != cdcpb.Event_Row_PUT { + return true, nil + } + + userKey, err := DecodeV2Key(entry.Key) + if err != nil { + log.Warn("Unexpected key not in RawKV V2 format", zap.String("entry.Key", hex.EncodeToString(entry.Key)), zap.Error(err)) + return false, err + } + + if len(f.keyPrefix) > 0 && !bytes.HasPrefix(userKey, f.keyPrefix) { + return false, nil + } + if f.keyPattern != nil && !f.keyPattern.MatchString(ConvertToUTF8(userKey, "latin1")) { + return false, nil + } + if entry.GetOpType() == cdcpb.Event_Row_PUT && + f.valuePattern != nil && + !f.valuePattern.MatchString(ConvertToUTF8(entry.GetValue(), "latin1")) { + return false, nil + } + + return true, nil +} + +func ConvertToUTF8(strBytes []byte, origEncoding string) string { + byteReader := bytes.NewReader(strBytes) + reader, _ := charset.NewReaderLabel(origEncoding, byteReader) + strBytes, _ = io.ReadAll(reader) + return string(strBytes) +} diff --git a/cdc/pkg/util/kv_filter_test.go b/cdc/pkg/util/kv_filter_test.go new file mode 100644 index 00000000..14d806f5 --- /dev/null +++ b/cdc/pkg/util/kv_filter_test.go @@ -0,0 +1,132 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/pingcap/kvproto/pkg/cdcpb" + "github.com/stretchr/testify/require" +) + +func TestKvFilterConfig(t *testing.T) { + t.Parallel() + conf := KvFilterConfig{} + require.Nil(t, conf.Validate()) + + conf = KvFilterConfig{} + conf.KeyPrefix = `prefix\x00\x11\\` + conf.KeyPattern = `key\x00pattern` + conf.ValuePattern = `value\ffpattern` + require.Nil(t, conf.Validate()) + + conf = KvFilterConfig{} + conf.KeyPattern = "\xfd\xe2" // invalid utf8 + require.Error(t, conf.Validate()) + + conf = KvFilterConfig{} + conf.KeyPrefix = `\zz` // invalid escaped + require.Error(t, conf.Validate()) +} + +func TestKvFilterMatch(t *testing.T) { + assert := require.New(t) + + entry := cdcpb.Event_Row{ + OpType: cdcpb.Event_Row_PUT, + Key: []byte("r\x00\x00\x00key\x00\x11pattern"), + Value: []byte("value\xaa\xffpattern"), + } + + type testCase struct { + pattern string + match bool + } + + keyPrefixCases := []testCase{ + {`key\x00`, true}, + {`key\x00\x11pattern`, true}, + {`key\x00\x11pattern\x00`, false}, + {`key\x01\x11pattern`, false}, + } + for _, c := range keyPrefixCases { + conf := KvFilterConfig{KeyPrefix: c.pattern} + filter := CreateFilter(&conf) + matched, err := filter.EventMatch(&entry) + assert.NoError(err) + assert.Equalf(c.match, matched, "pattern: %s", c.pattern) + } + + keyPatternCases := []testCase{ + {`key\x00`, true}, + {`key\x00\x11pattern`, true}, + {`key\x00\x11pattern\x00`, false}, + {`key\x01\x11pattern`, false}, + {`\x00\x11`, true}, + {`\x11`, true}, + {`\x10`, false}, + {`\x00[\x00\x11]`, true}, + {`\x00.?pattern`, true}, + } + for _, c := range keyPatternCases { + conf := KvFilterConfig{KeyPattern: c.pattern} + filter := CreateFilter(&conf) + matched, err := filter.EventMatch(&entry) + assert.NoError(err) + assert.Equalf(c.match, matched, "pattern: %s", c.pattern) + } + + valuePatternCases := []testCase{ + {`value[\xaa-\xff]+pattern`, true}, + {`value[\xaa-\xbb]+pattern`, false}, + } + for _, c := range valuePatternCases { + conf := KvFilterConfig{ValuePattern: c.pattern} + filter := CreateFilter(&conf) + matched, err := filter.EventMatch(&entry) + assert.NoError(err) + assert.Equalf(c.match, matched, "pattern: %s", c.pattern) + } + + // delete entry + { + entry := cdcpb.Event_Row{ + OpType: cdcpb.Event_Row_DELETE, + Key: []byte("r\x00\x00\x00key\x00\x11pattern"), + Value: []byte(""), + } + conf := KvFilterConfig{ + KeyPrefix: "key\x00", + KeyPattern: `key.*pattern`, + ValuePattern: `value`, + } + filter := CreateFilter(&conf) + matched, err := filter.EventMatch(&entry) + assert.NoError(err) + assert.True(matched) + } + + // not RawKV V2 key + { + entry := cdcpb.Event_Row{ + OpType: cdcpb.Event_Row_DELETE, + Key: []byte("k"), + Value: []byte(""), + } + conf := KvFilterConfig{} + filter := CreateFilter(&conf) + _, err := filter.EventMatch(&entry) + assert.Error(err) + } +} diff --git a/cdc/tests/integration_tests/_utils/check_total_kvs b/cdc/tests/integration_tests/_utils/check_total_kvs new file mode 100755 index 00000000..afdce835 --- /dev/null +++ b/cdc/tests/integration_tests/_utils/check_total_kvs @@ -0,0 +1,47 @@ +#!/bin/bash +# parameter 1: work directory +# parameter 2: dst pd +# parameter 3: total kvs +# parameter 4: max check times + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +TLS_DIR=$(cd $CUR/../_certificates && pwd) + +workdir=$1 +DOWN_PD=$2 +TOTAL_KVS=$3 +if [ $# -ge 4 ]; then + check_time=$4 +else + check_time=50 +fi +PWD=$(pwd) + +if ! command -v rawkv_data &>/dev/null; then + cd $CUR/../../.. + make rawkv_data + cd $PWD +fi +set +e + +cd $workdir +i=0 +while [ $i -lt $check_time ]; do + rm -rf $workdir/rawkv_data/ + rawkv_data totalkvs --dst-pd $DOWN_PD --count $TOTAL_KVS --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLS_DIR/client-key.pem + ret=$? + if [ "$ret" == 0 ]; then + echo "check total-kvs successfully" + break + fi + ((i++)) + echo "check total-kvs failed $i-th time, retry later" + sleep 3 +done + +if [ $i -ge $check_time ]; then + echo "check total-kvs failed at last" + exit 1 +fi + +cd $PWD diff --git a/cdc/tests/integration_tests/kv_filter/conf/changefeed.toml b/cdc/tests/integration_tests/kv_filter/conf/changefeed.toml new file mode 100644 index 00000000..c3a1f4ff --- /dev/null +++ b/cdc/tests/integration_tests/kv_filter/conf/changefeed.toml @@ -0,0 +1,3 @@ +[filter] +key-prefix = "indexInfo_:_pf01_:" +key-pattern = "_APD0101_:_0{15}[3-9]" diff --git a/cdc/tests/integration_tests/kv_filter/run.sh b/cdc/tests/integration_tests/kv_filter/run.sh new file mode 100644 index 00000000..c5404e1b --- /dev/null +++ b/cdc/tests/integration_tests/kv_filter/run.sh @@ -0,0 +1,49 @@ +#!/bin/bash + +set -euo pipefail + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=tikv-cdc.test +SINK_TYPE=$1 +UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1 +DOWN_PD=http://$DOWN_PD_HOST:$DOWN_PD_PORT + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + start_tidb_cluster --workdir $WORK_DIR + cd $WORK_DIR + + start_ts=$(get_start_ts $UP_PD) + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + case $SINK_TYPE in + tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + *) SINK_URI="" ;; + esac + + uuid="custom-changefeed-name" + tikv-cdc cli changefeed create \ + --start-ts=$start_ts \ + --sink-uri="$SINK_URI" \ + --changefeed-id="$uuid" \ + --config $CUR/conf/changefeed.toml + + rawkv_op $UP_PD put 5000 + + # Filter configured in $CUR/conf/changefeed.toml will match events with key >= 3000 + # So wait for sync finished, pause changefeed, delete keys < 3000 for upstream, then check_sync_diff + sleep 1 && check_total_kvs $WORK_DIR $DOWN_PD 2000 + run_cdc_cli changefeed --changefeed-id $uuid pause + rawkv_op $UP_PD delete 3000 + + check_sync_diff $WORK_DIR $UP_PD $DOWN_PD + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/cdc/tests/integration_tests/run_group.sh b/cdc/tests/integration_tests/run_group.sh index 780432c2..1a28d552 100755 --- a/cdc/tests/integration_tests/run_group.sh +++ b/cdc/tests/integration_tests/run_group.sh @@ -11,7 +11,7 @@ group=$1 # https://github.com/PingCAP-QE/ci/blob/main/pipelines/tikv/migration/latest/pull_integration_test.groovy declare -A groups groups=( - ["G00"]='autorandom' + ["G00"]='autorandom kv_filter' ["G01"]='capture_session_done_during_task cdc_hang_on' ["G02"]='changefeed_auto_stop changefeed_error changefeed_fast_fail' ["G03"]='changefeed_finish changefeed_pause_resume changefeed_reconstruct' diff --git a/cdc/tests/utils/rawkv_data/checksum.go b/cdc/tests/utils/rawkv_data/checksum.go index 6f7579ab..8f89edbb 100644 --- a/cdc/tests/utils/rawkv_data/checksum.go +++ b/cdc/tests/utils/rawkv_data/checksum.go @@ -81,3 +81,45 @@ func runChecksum(cmd *cobra.Command) error { fmt.Printf("Upstream checksum %v are same with downstream %v\n", srcChecksum, dstChecksum) return nil } + +func NewTotalKvsCommand() *cobra.Command { + return &cobra.Command{ + Use: "totalkvs", + Short: "Verify that the total number of key-values of downstream is equal to --count argument", + SilenceUsage: true, + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, _ []string) error { + return runTotalKvs(cmd) + }, + } +} + +func runTotalKvs(cmd *cobra.Command) error { + cfg := &Config{} + err := cfg.ParseFromFlags(cmd.Flags(), true) + if err != nil { + return err + } + ctx := context.Background() + + dstCli, err := rawkv.NewClientWithOpts(ctx, cfg.DstPD, + rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), + rawkv.WithSecurity(cfg.DstSec)) + if err != nil { + return err + } + defer dstCli.Close() + + dstChecksum, err := dstCli.Checksum(ctx, nil, nil) + if err != nil { + return err + } + + if dstChecksum.TotalKvs != uint64(cfg.Count) { + msg := fmt.Sprintf("Downstream total kvs %v is not equal to expected %v", dstChecksum, cfg.Count) + log.Info(msg) + return fmt.Errorf(msg) + } + fmt.Printf("Downstream total kvs %v equals to expected %v", dstChecksum, cfg.Count) + return nil +} diff --git a/cdc/tests/utils/rawkv_data/gen_data.go b/cdc/tests/utils/rawkv_data/gen_data.go index 786f1aef..56eb9395 100644 --- a/cdc/tests/utils/rawkv_data/gen_data.go +++ b/cdc/tests/utils/rawkv_data/gen_data.go @@ -33,11 +33,11 @@ func generateTestData(keyIndex int) (key, value0, value1 []byte) { key = []byte(fmt.Sprintf("indexInfo_:_pf01_:_APD0101_:_%019d", keyIndex)) value0 = []byte{} // Don't assign nil, which means "NotFound" in CompareAndSwap if keyIndex%100 != 42 { // To generate test data with empty value. See https://github.com/tikv/migration/issues/250 - value0 = []byte(fmt.Sprintf("v0_%020d", keyIndex)) + value0 = []byte(fmt.Sprintf("v0_%020d_%v", keyIndex, rand.Uint64())) } value1 = []byte{} if keyIndex%100 != 43 { - value1 = []byte(fmt.Sprintf("v1_%020d%020d", keyIndex, keyIndex)) + value1 = []byte(fmt.Sprintf("v1_%020d%020d_%v", keyIndex, keyIndex, rand.Uint64())) } return key, value0, value1 } @@ -169,7 +169,7 @@ func runPutCmd(cmd *cobra.Command) error { endIdx := startIdx + count1 + count2 kvCntPerBatch := 512 for startIdx1 < endIdx { - batchCnt := min(kvCntPerBatch, endIdx-startIdx) + batchCnt := min(kvCntPerBatch, endIdx-startIdx1) keys, values0, values1 := batchGenerateData(startIdx1, batchCnt) err := cli.BatchPut(ctx, keys, values0) if err != nil { @@ -179,7 +179,7 @@ func runPutCmd(cmd *cobra.Command) error { if err != nil { return err } - startIdx1 += kvCntPerBatch + startIdx1 += batchCnt } return nil }) @@ -199,7 +199,7 @@ func runPutCmd(cmd *cobra.Command) error { return err } if !ret && !bytes.Equal(preValue, value1) { - return errors.Errorf("CAS put data error: preValue: %v, ret: %v, value0: %v, value1: %v", preValue, ret, value0, value1) + return errors.Errorf("CAS put data error: preValue: %v, ret: %v, value0: %v, value1: %v", string(preValue), ret, string(value0), string(value1)) } } return nil diff --git a/cdc/tests/utils/rawkv_data/main.go b/cdc/tests/utils/rawkv_data/main.go index 84276915..eb00aca2 100644 --- a/cdc/tests/utils/rawkv_data/main.go +++ b/cdc/tests/utils/rawkv_data/main.go @@ -129,6 +129,7 @@ func main() { rootCmd.AddCommand(NewPutCommand()) rootCmd.AddCommand(NewDeleteCommand()) rootCmd.AddCommand(NewChecksumCommand()) + rootCmd.AddCommand(NewTotalKvsCommand()) rootCmd.SetOut(os.Stdout) rootCmd.SetArgs(os.Args[1:])