Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #232] Add https for tikv sink #233

Merged
merged 12 commits into from
Sep 22, 2022
Merged
1 change: 1 addition & 0 deletions cdc/cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (m *feedStateManager) PushAdminJob(job *model.AdminJob) {

func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
job := m.popAdminJob()

if job == nil || job.CfID != m.state.ID {
return false
}
Expand Down
16 changes: 13 additions & 3 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,17 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {

func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
config := tikvconfig.DefaultConfig()
pdAddrPrefix := "http://"

if sinkURI.Query().Get("ca-path") != "" {
config.Security = tikvconfig.NewSecurity(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could CA CERT and KEY be set by config file?

sinkURI.Query().Get("ca-path"),
sinkURI.Query().Get("cert-path"),
sinkURI.Query().Get("key-path"),
nil,
)
pdAddrPrefix = "https://"
}

pdAddr := strings.Split(sinkURI.Host, ",")
if len(pdAddr) > 0 {
Expand All @@ -434,11 +445,10 @@ func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config,
err = fmt.Errorf("Invalid pd addr: %v, err: %v", addr, err)
return nil, nil, cerror.WrapError(cerror.ErrTiKVInvalidConfig, err)
}
// TODO: support https
pdAddr[i] = "http://" + addr
pdAddr[i] = pdAddrPrefix + addr
}
} else {
pdAddr = append(pdAddr, "http://127.0.0.1:2379")
pdAddr = append(pdAddr, pdAddrPrefix+"127.0.0.1:2379")
}

s := sinkURI.Query().Get("concurrency")
Expand Down
35 changes: 25 additions & 10 deletions cdc/cdc/sink/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,34 @@ func TestExtractRawKVEntry(t *testing.T) {

func TestTiKVSinkConfig(t *testing.T) {
defer testleak.AfterTestT(t)()

require := require.New(t)

uri := "tikv://127.0.0.1:1001,127.0.0.2:1002/?concurrency=10"
sinkURI, err := url.Parse(uri)
require.NoError(err)
cases := []string{
"tikv://127.0.0.1:1001,127.0.0.2:1002,127.0.0.1:1003/?concurrency=12",
"tikv://127.0.0.1:1001,127.0.0.1:1002/?concurrency=10&ca-path=./ca-cert.pem&cert-path=./client-cert.pem&key-path=./client-key",
}

opts := make(map[string]string)
_, pdAddr, err := parseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Len(pdAddr, 2)
require.Equal([]string{"http://127.0.0.1:1001", "http://127.0.0.2:1002"}, pdAddr)
require.Equal("10", opts["concurrency"])
expected := []struct {
pdAddr []string
concurrency string
security tikvconfig.Security
}{
{[]string{"http://127.0.0.1:1001", "http://127.0.0.2:1002", "http://127.0.0.1:1003"}, "12", tikvconfig.Security{}},
{[]string{"https://127.0.0.1:1001", "https://127.0.0.1:1002"}, "10", tikvconfig.NewSecurity("./ca-cert.pem", "./client-cert.pem", "./client-key", nil)},
}

for i, uri := range cases {

sinkURI, err := url.Parse(uri)
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Equal(expected[i].pdAddr, pdAddr)
require.Equal(expected[i].concurrency, opts["concurrency"])
require.Equal(expected[i].security, config.Security)
}
}

func TestTiKVSinkBatcher(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions cdc/tests/integration_tests/_utils/check_sync_diff
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
# parameter 3: dst pd
# parameter 4: max check times

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
TLS_DIR=$(cd $CUR/../_certificates && pwd)

workdir=$1
UP_PD=$2
DOWN_PD=$3
Expand All @@ -15,7 +18,6 @@ fi
PWD=$(pwd)

if ! command -v rawkv_data &>/dev/null; then
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
cd $CUR/../../..
make rawkv_data
cd $PWD
Expand All @@ -26,7 +28,7 @@ cd $workdir
i=0
while [ $i -lt $check_time ]; do
rm -rf $workdir/rawkv_data/
rawkv_data checksum --src-pd $UP_PD --dst-pd $DOWN_PD
rawkv_data checksum --src-pd $UP_PD --dst-pd $DOWN_PD --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLS_DIR/client-key.pem
ret=$?
if [ "$ret" == 0 ]; then
echo "check diff successfully"
Expand Down
5 changes: 3 additions & 2 deletions cdc/tests/integration_tests/_utils/rawkv_op
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
# parameter 2: put/delete
# parameter 3: key count

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
TLS_DIR=$(cd $CUR/../_certificates && pwd)
set -e

if ! command -v rawkv_data &>/dev/null; then
echo "make rawkv_data"
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
PWD=$(pwd)
cd $CUR/../../..
make rakv_data
cd $PWD
fi

echo "run put data"
rawkv_data $2 --src-pd $1 --count $3
rawkv_data $2 --src-pd $1 --count $3 --ca-path=$TLS_DIR/ca.pem --cert-path=$TLS_DIR/client.pem --key-path=$TLD_DIR/client-key.pem
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,6 @@ while ! mysql -uroot -h${TLS_TIDB_HOST} -P${TLS_TIDB_PORT} --default-character-s
sleep 2
done

run_sql "update mysql.tidb set variable_value='60m' where variable_name='tikv_gc_life_time';" ${TLS_TIDB_HOST} ${TLS_TIDB_PORT} \
--ssl-ca=$TLS_DIR/ca.pem \
--ssl-cert=$TLS_DIR/server.pem \
--ssl-key=$TLS_DIR/server-key.pem

echo "Starting CDC state checker..."
cd $CUR/../../utils/cdc_state_checker
if [ ! -f ./cdc_state_checker ]; then
Expand Down
45 changes: 45 additions & 0 deletions cdc/tests/integration_tests/sink_tls/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/bin/bash

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
TLS_DIR=$(cd $CUR/../_certificates && pwd)
CDC_BINARY=tikv-cdc.test
SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1
DOWN_PD=https://$TLS_PD_HOST:$TLS_PD_PORT

function run() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR
start_tls_tidb_cluster --workdir $WORK_DIR --tlsdir $TLS_DIR

cd $WORK_DIR

# record tso before we create tables to skip the system table DDLs
start_ts=$(tikv-cdc cli tso query --pd=$UP_PD)
sleep 10
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

case $SINK_TYPE in
tikv) SINK_URI="tikv://${TLS_PD_HOST}:${TLS_PD_PORT}/?ca-path=$TLS_DIR/ca.pem&cert-path=$TLS_DIR/client.pem&key-path=$TLS_DIR/client-key.pem" ;;
*) SINK_URI="" ;;
esac

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri=$SINK_URI

rawkv_op $UP_PD put 10000
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD
rawkv_op $UP_PD delete 10000
check_sync_diff $WORK_DIR $UP_PD $DOWN_PD

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
10 changes: 3 additions & 7 deletions cdc/tests/utils/rawkv_data/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,19 @@ func NewChecksumCommand() *cobra.Command {

func runChecksum(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), true)
if err != nil {
return err
}
ctx := context.Background()

if cfg.DstPD == "" {
return fmt.Errorf("Downstream cluster PD is not set")
}

srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
srcCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer srcCli.Close()

dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
dstCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.DstPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.DstSec))
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/tests/utils/rawkv_data/gen_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ func NewDeleteCommand() *cobra.Command {

func runPointDelete(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), false)
if err != nil {
return err
}

ctx := context.Background()
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down Expand Up @@ -110,18 +110,18 @@ func runPointDelete(cmd *cobra.Command) error {

func runPointPut(cmd *cobra.Command) error {
cfg := &Config{}
err := cfg.ParseFromFlags(cmd.Flags())
err := cfg.ParseFromFlags(cmd.Flags(), false)
if err != nil {
return err
}
ctx := context.Background()

cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
cli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
defer cli.Close()
atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2))
atomicCli, err := rawkv.NewClientWithOpts(ctx, []string{cfg.SrcPD}, rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2), rawkv.WithSecurity(cfg.SrcSec))
if err != nil {
return err
}
Expand Down
58 changes: 53 additions & 5 deletions cdc/tests/utils/rawkv_data/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package main
import (
"fmt"
"os"
"strings"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/config"
)

const (
Expand All @@ -30,23 +32,34 @@ const (
flagDstPD = "dst-pd"
flagCount = "count"
flagStartIndex = "start-index"
flagCAPath = "ca-path"
flagCertPath = "cert-path"
flagKeyPath = "key-path"
)

type Config struct {
SrcPD string `json:"src-pd"`
DstPD string `json:"dst_pd"`
StartIndex int `json:"start_index"`
Count int `json:"count"`
SrcPD string `json:"src-pd"`
DstPD string `json:"dst-pd"`
StartIndex int `json:"start-index"`
Count int `json:"count"`
CAPath string `json:"ca-path"`
CertPath string `json:"cert-path"`
KeyPath string `json:"key-path"`
SrcSec config.Security `json:"src-sec"`
DstSec config.Security `json:"dst-sec"`
}

func AddFlags(cmd *cobra.Command) {
cmd.PersistentFlags().String(flagSrcPD, "127.0.0.1:2379", "Upstream PD address")
cmd.PersistentFlags().String(flagDstPD, "", "Downstream PD address")
cmd.PersistentFlags().Int(flagStartIndex, 0, "The start index of generated keys")
cmd.PersistentFlags().Int(flagCount, 1000, "The number of key")
cmd.PersistentFlags().String(flagCAPath, "", "Path to CA certificate")
cmd.PersistentFlags().String(flagCertPath, "", "Path to client certificate")
cmd.PersistentFlags().String(flagKeyPath, "", "Path to client key")
}

func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet, requireDstPD bool) error {
var err error
if cfg.SrcPD, err = flags.GetString(flagSrcPD); err != nil {
return err
Expand All @@ -60,6 +73,41 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if cfg.Count, err = flags.GetInt(flagCount); err != nil {
return err
}
if cfg.CAPath, err = flags.GetString(flagCAPath); err != nil {
return err
}
if cfg.CertPath, err = flags.GetString(flagCertPath); err != nil {
return err
}
if cfg.KeyPath, err = flags.GetString(flagKeyPath); err != nil {
return err
}

if cfg.SrcPD == "" {
return fmt.Errorf("Upstream cluster PD is not set")
}
if strings.HasPrefix(cfg.SrcPD, "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
cfg.SrcSec.ClusterSSLCA = cfg.CAPath
cfg.SrcSec.ClusterSSLCert = cfg.CertPath
cfg.SrcSec.ClusterSSLKey = cfg.KeyPath
}

if requireDstPD {
if cfg.DstPD == "" {
return fmt.Errorf("Downstream cluster PD is not set")
}
if strings.HasPrefix(cfg.DstPD, "https://") {
if cfg.CAPath == "" || cfg.CertPath == "" || cfg.KeyPath == "" {
return fmt.Errorf("CAPath/CertPath/KeyPath is not set")
}
cfg.DstSec.ClusterSSLCA = cfg.CAPath
cfg.DstSec.ClusterSSLCert = cfg.CertPath
cfg.DstSec.ClusterSSLKey = cfg.KeyPath
}
}
return nil
}

Expand Down