Skip to content

feat(enterpise): Change data capture (CDC) integration with kafka #7395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9bb08c1
starting work on change data capture
aman-bansal Jan 28, 2021
ea3ba20
cdc event sink using jobs
aman-bansal Jan 29, 2021
df5d5c9
adding cdc proposal
aman-bansal Jan 29, 2021
95a9489
generating protos and fixing proposal
aman-bansal Jan 30, 2021
47151f6
making cdc job based and fixing snapshots
aman-bansal Feb 3, 2021
792b408
fixing oss build
aman-bansal Feb 3, 2021
c915370
fixin alpha config + making snapshots to work with minReadTs
aman-bansal Feb 3, 2021
e89800d
using maxReadTs in case of reset
aman-bansal Feb 3, 2021
f3d886e
intermediate change commit for maxReadTs
aman-bansal Feb 3, 2021
91a029e
adding comments and logic description
aman-bansal Feb 4, 2021
ef50b2f
fixing events for schema
aman-bansal Feb 4, 2021
f2fa780
fixing implementation for ludicrous mode
aman-bansal Feb 4, 2021
c307644
fixing merge conflicts
aman-bansal Feb 4, 2021
c60d465
minor comments and description fix
aman-bansal Feb 4, 2021
02693d3
adding test case for change data capture
aman-bansal Feb 4, 2021
104bbd2
fixing based on reviews
aman-bansal Feb 5, 2021
c7e23e1
setting maxts after successful sending of event
aman-bansal Feb 5, 2021
9201499
renamin sink for more clarity
aman-bansal Feb 5, 2021
1f2304b
Simplify implementation
manishrjain Feb 5, 2021
1fc9715
refactoring cdc and complete implementation
aman-bansal Feb 8, 2021
e695414
making cdc to not send duplicate events in case of leadership change
aman-bansal Feb 8, 2021
2175929
oss install fix
aman-bansal Feb 8, 2021
6cec4b3
managing state for ludicrous mode
aman-bansal Feb 8, 2021
ea68eb6
some cleanup
Feb 8, 2021
76d78e1
removing redundant flags
aman-bansal Feb 8, 2021
916e0ff
removing batch messages code
aman-bansal Feb 8, 2021
7d69d0d
Manish's comments
manishrjain Feb 8, 2021
63f0221
making cdc to fail on ludicrous mode
aman-bansal Feb 9, 2021
288f0b9
small fix
aman-bansal Feb 9, 2021
2e3a65e
Manish's comments
manishrjain Feb 9, 2021
4c7c629
basic fix after manish change
aman-bansal Feb 9, 2021
d1f9bb3
Manish's edits
manishrjain Feb 9, 2021
5b0b905
change event structure + fixing cdc index
aman-bansal Feb 9, 2021
f980339
fixing for DROP operations
aman-bansal Feb 9, 2021
b32e44a
basic refactoring
aman-bansal Feb 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,20 @@ they form a Raft group and provide synchronous replication.
flag.
Sample flag could look like --audit dir=aa;encrypt_file=/filepath;compress=true`)

flag.String("cdc", "",
`Various change data capture options.
max-recovery=N to define the maximum amount of pending txn events can lag behind the
current index in case of sink failure. Default is 10000 pending txn entries.

file=/path/to/directory where audit logs will be stored
kafka=host1,host2 to define comma separated list of host.
sasl-user=username to define sasl username for kafka.
sasl-password=password to define sasl password for kafka.
ca-cert=/path/to/ca/crt/file to define ca cert for tls encryption.
client-cert=/path/to/client/cert/file to define the client certificate for tls encryption.
client-key=/path/to/client/key/file to define the client key for tls encryption.
`)

// TLS configurations
x.RegisterServerTLSFlags(flag)
}
Expand Down Expand Up @@ -618,9 +632,10 @@ func run() {
PIndexCacheSize: pstoreIndexCacheSize,
WalCache: walCache,

MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Audit: conf,
MutationsMode: worker.AllowMutations,
AuthToken: Alpha.Conf.GetString("auth_token"),
Audit: conf,
ChangeDataConf: Alpha.Conf.GetString("cdc"),
}

secretFile := Alpha.Conf.GetString("acl_secret_file")
Expand Down
3 changes: 2 additions & 1 deletion ee/enc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package enc

import (
"io"

"github.com/dgraph-io/dgraph/x"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"io"
)

// Eebuild indicates if this is a Enterprise build.
Expand Down
3 changes: 3 additions & 0 deletions ee/utils_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ func GetEEFeaturesList() []string {
if x.WorkerConfig.Audit {
ee = append(ee, "audit")
}
if worker.Config.ChangeDataConf != "" {
ee = append(ee, "cdc")
}
return ee
}
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/Masterminds/semver/v3 v3.1.0
github.com/Microsoft/go-winio v0.4.15 // indirect
github.com/OneOfOne/xxhash v1.2.5 // indirect
github.com/Shopify/sarama v1.27.2
github.com/blevesearch/bleve v1.0.13
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v3 v3.0.0-20210125092849-88bf5aab9f50
Expand All @@ -36,9 +37,9 @@ require (
github.com/golang/geo v0.0.0-20170810003146-31fb0106dc4a
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.3.5
github.com/golang/snappy v0.0.1
github.com/golang/snappy v0.0.2
github.com/google/codesearch v1.0.0
github.com/google/go-cmp v0.5.0
github.com/google/go-cmp v0.5.2
github.com/google/uuid v1.0.0
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9
Expand All @@ -47,6 +48,7 @@ require (
github.com/minio/minio-go/v6 v6.0.55
github.com/mitchellh/panicwrap v1.0.0
github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.2.1
github.com/prometheus/client_golang v0.9.3
Expand All @@ -57,12 +59,12 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.5.1
github.com/stretchr/testify v1.6.1
github.com/twpayne/go-geom v1.0.5
go.etcd.io/etcd v0.0.0-20190228193606-a943ad0ee4c9
go.opencensus.io v0.22.5
go.uber.org/zap v1.16.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
Expand Down
56 changes: 54 additions & 2 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ message Proposal {
uint64 index = 10; // Used to store Raft index, in raft.Ready.
uint64 expected_checksum = 11; // Block an operation until membership reaches this checksum.
RestoreRequest restore = 12;
uint64 cdc_ts = 13;
}

message KVS {
Expand Down
664 changes: 350 additions & 314 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

Empty file added systest/cdc/cdc_logs/.gitkeep
Empty file.
71 changes: 71 additions & 0 deletions systest/cdc/cdc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2021 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cdc

import (
"bufio"
"encoding/json"
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

"github.com/dgraph-io/dgraph/testutil"

"github.com/stretchr/testify/require"
)

func TestCDC(t *testing.T) {
defer os.RemoveAll("./cdc_logs/sink.log")
cmd := exec.Command("dgraph", "increment", "--num", "10",
"--alpha", testutil.SockAddr)
if out, err := cmd.CombinedOutput(); err != nil {
fmt.Println(string(out))
t.Fatal(err)
}
time.Sleep(time.Second * 15)
verifyCDC(t, "./cdc_logs/sink.log")
}

type CDCEvent struct {
Value struct {
Event struct {
Value int `json:"value"`
} `json:"event"`
} `json:"value"`
}

func verifyCDC(t *testing.T, path string) {
abs, err := filepath.Abs(path)
require.Nil(t, err)
f, err := os.Open(abs)
require.Nil(t, err)
var fileScanner *bufio.Scanner
fileScanner = bufio.NewScanner(f)
iter := 1
for fileScanner.Scan() {
bytes := fileScanner.Bytes()
l := new(CDCEvent)
err := json.Unmarshal(bytes, l)
require.Nil(t, err)
require.Equal(t, iter, l.Value.Event.Value)
iter = iter + 1
}
require.Equal(t, iter, 11)
}
36 changes: 36 additions & 0 deletions systest/cdc/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: "3.5"
services:
alpha1:
image: dgraph/dgraph:latest
working_dir: /data/alpha1
labels:
cluster: test
ports:
- "8080"
- "9080"
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: ./cdc_logs
target: /cdc
command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr
--cdc "max_recovery=10000;file=/cdc" -v=2
--whitelist=10.0.0.0/8,172.16.0.0/12,192.168.0.0/16
zero1:
image: dgraph/dgraph:latest
working_dir: /data/zero1
labels:
cluster: test
ports:
- "5080"
- "6080"
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero --my=zero1:5080 --logtostderr -v=2 --bindall
volumes: {}
47 changes: 47 additions & 0 deletions worker/cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// +build oss

/*
* Copyright 2021 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package worker

import (
"math"
)

type CDC struct {
}

func newCDC() *CDC {
return nil
}

func (cd *CDC) getTs() uint64 {
return math.MaxUint64
}

func (cd *CDC) updateTs(ts uint64) {
return
}

func (cd *CDC) Close() {
return
}

// todo: test cases old cluster restart, live loader, bulk loader, backup restore etc
func (cd *CDC) processCDCEvents() {
return
}
Loading