Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #323] *: Remove tidb dependency #367

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
Expand Down Expand Up @@ -61,7 +60,7 @@
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
kvStorage tikv.Storage
createEtcdClient createEtcdClientFunc
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
Expand All @@ -75,7 +74,7 @@
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, createEtcdClient createEtcdClientFunc) *Capture {
func NewCapture(pdClient pd.Client, kvStorage tikv.Storage, createEtcdClient createEtcdClientFunc) *Capture {

Check warning on line 77 in cdc/cdc/capture/capture.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/capture/capture.go#L77

Added line #L77 was not covered by tests
return &Capture{
pdClient: pdClient,
kvStorage: kvStorage,
Expand Down
4 changes: 2 additions & 2 deletions cdc/cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/owner"
"github.com/tikv/migration/cdc/pkg/config"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/logutil"
"github.com/tikv/migration/cdc/pkg/retry"
"github.com/tikv/migration/cdc/pkg/version"
Expand Down Expand Up @@ -782,7 +782,7 @@
}

// forward to owner
cli := httputil.NewClient(tslConfig)
cli := httputil.NewClientByTLSConfig(tslConfig)

Check warning on line 785 in cdc/cdc/capture/http_handler.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/capture/http_handler.go#L785

Added line #L785 was not covered by tests
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/http_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/httputil"
"github.com/tikv/migration/cdc/cdc/capture"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/pkg/config"
cerrors "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/httputil"
"github.com/tikv/migration/cdc/pkg/retry"
security2 "github.com/tikv/migration/cdc/pkg/security"
"github.com/tikv/migration/cdc/pkg/util/testleak"
Expand Down Expand Up @@ -246,7 +246,7 @@ func (s *httpStatusSuite) TestServerTLSWithoutCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down Expand Up @@ -324,7 +324,7 @@ func (s *httpStatusSuite) TestServerTLSWithCommonName(c *check.C) {
if err != nil {
c.Assert(err, check.IsNil)
}
cli := httputil.NewClient(tlsConfig)
cli := httputil.NewClientByTLSConfig(tlsConfig)
resp, err := cli.Get(statusURL)
if err != nil {
return err
Expand Down
6 changes: 2 additions & 4 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -399,9 +398,8 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
cluster.ChangeLeader(3, 5)

ts, err := kvStorage.CurrentTimestamp(oracle.GlobalTxnScope)
ver := kv.NewVersion(ts)
c.Assert(err, check.IsNil)
ch2 <- makeEvent(ver.Ver)
ch2 <- makeEvent(ts)
var event model.RegionFeedEvent
// consume the first resolved ts event, which is sent before region starts
<-eventCh
Expand All @@ -417,7 +415,7 @@ func (s *clientSuite) TestConnectOfflineTiKV(c *check.C) {
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}
checkEvent(event, GetSafeResolvedTs(ver.Ver))
checkEvent(event, GetSafeResolvedTs(ts))

// check gRPC connection active counter is updated correctly
bucket, ok := grpcPool.bucketConns[invalidStore]
Expand Down
17 changes: 9 additions & 8 deletions cdc/cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@
log.Warn("failed to get current version from PD", zap.Error(err))
continue
}
currentTimeFromPD := oracle.GetTimeFromTS(version.Ver)
currentTimeFromPD := oracle.GetTimeFromTS(version)
expired := make([]*regionTsInfo, 0)
for w.rtsManager.Len() > 0 {
item := w.rtsManager.Pop()
Expand All @@ -330,7 +330,7 @@
if len(expired) == 0 {
continue
}
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
// maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
for _, rts := range expired {
state, ok := w.getRegionState(rts.regionID)
if !ok || state.isStopped() {
Expand Down Expand Up @@ -359,18 +359,19 @@
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time",

Check warning on line 362 in cdc/cdc/kv/region_worker.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/region_worker.go#L362

Added line #L362 was not covered by tests
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs),
)
err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
if err != nil {
log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
continue
}
// Resolve locks for RawKV is not necessary. Add it back after we support TxnKV.
// err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion)
// if err != nil {
// log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err))
// continue
// }
rts.ts.penalty = 0
}
rts.ts.resolvedTs = lastResolvedTs
Expand Down
37 changes: 10 additions & 27 deletions cdc/cdc/kv/store_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,15 @@
package kv

import (
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/driver"
tikvconfig "github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv"
"github.com/tikv/migration/cdc/cdc/model"
cerror "github.com/tikv/migration/cdc/pkg/errors"
"github.com/tikv/migration/cdc/pkg/flags"
Expand All @@ -38,7 +33,7 @@
// TiKVStorage is the tikv storage interface used by CDC.
type TiKVStorage interface {
tikv.Storage
GetCachedCurrentVersion() (version tidbkv.Version, err error)
GetCachedCurrentVersion() (version uint64, err error)
}

const (
Expand Down Expand Up @@ -80,7 +75,7 @@
}

// GetCachedCurrentVersion gets the cached version of currentVersion, and update the cache if necessary
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version tidbkv.Version, err error) {
func (s *StorageWithCurVersionCache) GetCachedCurrentVersion() (version uint64, err error) {
curVersionCacheMu.Lock()
entry, exists := curVersionCache[s.cacheKey]
curVersionCacheMu.Unlock()
Expand All @@ -99,42 +94,30 @@
if err != nil {
return
}
ver := kv.NewVersion(ts)
entry.ts = ver.Ver
entry.ts = ts
entry.lastUpdated = time.Now()
}

version.Ver = entry.ts
version = entry.ts
return
}

// GetSnapshotMeta returns tidb meta information
// TODO: Simplify the signature of this function
func GetSnapshotMeta(tiStore tidbkv.Storage, ts uint64) (*meta.Meta, error) {
snapshot := tiStore.GetSnapshot(tidbkv.NewVersion(ts))
return meta.NewSnapshotMeta(snapshot), nil
}

// CreateTiStore creates a new tikv storage client
func CreateTiStore(urls string, credential *security.Credential) (kv.Storage, error) {
func CreateTiStore(urls string, credential *security.Credential) (tikv.Storage, error) {

Check warning on line 106 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L106

Added line #L106 was not covered by tests
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", driver.TiKVDriver{})

if credential.CAPath != "" {
conf := tidbconfig.GetGlobalConfig()
conf := tikvconfig.GetGlobalConfig()

Check warning on line 113 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L113

Added line #L113 was not covered by tests
conf.Security.ClusterSSLCA = credential.CAPath
conf.Security.ClusterSSLCert = credential.CertPath
conf.Security.ClusterSSLKey = credential.KeyPath
tidbconfig.StoreGlobalConfig(conf)
tikvconfig.StoreGlobalConfig(conf)

Check warning on line 117 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L117

Added line #L117 was not covered by tests
}

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
tiStore, err := txnkv.NewClient(urlv.HostList())

Check warning on line 120 in cdc/cdc/kv/store_op.go

View check run for this annotation

Codecov / codecov/patch

cdc/cdc/kv/store_op.go#L120

Added line #L120 was not covered by tests
if err != nil {
return nil, cerror.WrapError(cerror.ErrNewStore, err)
}
Expand Down
Loading
Loading