Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(enterpise): Change data capture (CDC) integration with kafka #7395

Merged
merged 35 commits into from
Feb 9, 2021

Conversation

aman-bansal
Copy link
Contributor

@aman-bansal aman-bansal commented Feb 4, 2021

Flag: cdc and various sub-options for change data capture are.
file=/path/to/directory where audit logs will be stored
kafka=host1,host2 to define comma separated list of host.
sasl-user=username to define sasl username for kafka.
sasl-password=password to define sasl password for kafka.
ca-cert=/path/to/ca/crt/file to define ca cert for tls encryption.
client-cert=/path/to/client/cert/file to define the client certificate for tls encryption.
client-key=/path/to/client/key/file to define the client key for tls encryption.

Test Cases Verified:

  1. Sink Fails and comes back up. Pending events are sent to the sink
  2. Sink fails and then leader fails. Then sink comes up and events are reaching to the sink
  3. Waiting txns are getting sent and pending events are getting cleared with time.
  4. Aborted txns are getting cleared
  5. In case of leadership changes, old events are not being sent.
  6. for live loader we are getting all events,
  7. For bulk loader no events.

sample events for file-based sink looks like this

{ "key": "0", "value": {"meta":{"commit_ts":5},"type":"mutation","event":{"operation":"set","uid":2,"attr":"counter.val","value":1,"value_type":"int"}}}

This change is Reviewable

Copy link
Contributor

@manishrjain manishrjain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 14 files reviewed, 25 unresolved discussions (waiting on @aman-bansal and @vvbalaji-dgraph)


dgraph/cmd/alpha/run.go, line 206 at r1 (raw file):

	Sample flag could look like --audit dir=aa;encrypt_file=/filepath;compress=true`)

	flag.String("change_data", "",

can be called cdc. In the description you can call it change data capture.

sink should just part of this (no need to call anything sink)

kafka is an option and so on.
file is an option.


dgraph/cmd/alpha/run.go, line 209 at r1 (raw file):

		`Various change data capture options.
	enabled=true/false to enable change data capture. Default is false.
	max_recovery=N to define the maximum amount of pending txn events can lag behind the

switch all underscores to dashes.


dgraph/cmd/alpha/run.go, line 214 at r1 (raw file):

	flag.String("sink", "",
		`Various sink config options.

Various sink options for Change Data Capture.


dgraph/cmd/alpha/run.go, line 215 at r1 (raw file):

	flag.String("sink", "",
		`Various sink config options.
	destination=kafka://host1,host2 to define comma separated list of host.

Use kafka as the key.
Use file as the key.

You can have as many keys as you want. So, no need to have to use a "destination" key for everything.

kafka=host1,host2
file=...

dgraph/cmd/alpha/run.go, line 216 at r1 (raw file):

		`Various sink config options.
	destination=kafka://host1,host2 to define comma separated list of host.
	sasl_user=username to define sasl username for kafka.

sasl-user=


protos/pb.proto, line 321 at r1 (raw file):

	uint64 expected_checksum 	= 11; // Block an operation until membership reaches this checksum.
	RestoreRequest restore 		= 12;
	uint64 CDCMinReadTs 			= 13;

CDC_ts


systest/change_data/change_data_test.go, line 2 at r1 (raw file):

/*
 * Copyright 2017-2021 Dgraph Labs, Inc. and Contributors

worker/change_data.go, line 4 at r1 (raw file):

/*
 * Copyright 2021 Dgraph Labs, Inc. and Contributors

instead of calling it change_data. Just call it cdc.go.


worker/change_data_ee.go, line 4 at r1 (raw file):

/*
 * Copyright 2021 Dgraph Labs, Inc. and Contributors

cdc_ee.go


worker/change_data_ee.go, line 34 at r1 (raw file):

const defaultCDCConfig = "enabled=false; max_recovery=10000"

type ChangeData struct {

type CDC struct {


worker/change_data_ee.go, line 45 at r1 (raw file):

}

func initChangeDataCapture() *ChangeData {

Sounds like it's a new object: newCDC


worker/change_data_ee.go, line 113 at r1 (raw file):

			msgs[i] = SinkMessage{
				Meta: SinkMeta{
					Topic: "dgraph_cdc",

use dashes everywhere.


worker/change_data_ee.go, line 142 at r1 (raw file):

		// if cdc is lagging behind the current via maxRecoveryEntries,
		// skip ahead the cdcIndex to prevent uncontrolled growth of raft logs.
		if uint64(len(cd.pendingEvents)) > cd.maxRecoveryEntries {

if Alpha has taken a snapshot which is beyond what you think you're at, then clear pendingTxns. First index is beyond what you saw last first index or something.


worker/change_data_ee.go, line 144 at r1 (raw file):

		if uint64(len(cd.pendingEvents)) > cd.maxRecoveryEntries {
			glog.Info("too many pending cdc events. Skipping for now.")
			cd.updateMinReadTs(cd.maxReadTs)

cdc.updateTs(...)

Skip ahead to the latest Timestamp -- MaxAssigned. We're going to lose a bunch of entries here.

Actually, just start from the firstIndex when the instance starts. Don't worry about maxReadTs.

You don't need maxRecoveryEntries. Our snapshots can't progress if we're unable to send. So, when taking a snapshot, we should make a judgement about if we should take a snapshot beyond the send timestamp. If we do, then this logic works here automatically.


worker/change_data_ee.go, line 160 at r1 (raw file):

			for _, entry := range entries {
				if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 {
					cd.cdcIndex = entry.Index

cdc.Index


worker/change_data_ee.go, line 175 at r1 (raw file):

				// across the cluster in case of failures.
				if proposal.Mutations != nil && proposal.Mutations.StartTs > cd.getCDCMinReadTs() {
					events := transformMutationToCDCEvent(entry.Index, proposal.Mutations)

toCDCEvent


worker/change_data_ee.go, line 176 at r1 (raw file):

				if proposal.Mutations != nil && proposal.Mutations.StartTs > cd.getCDCMinReadTs() {
					events := transformMutationToCDCEvent(entry.Index, proposal.Mutations)
					if events == nil {

if len(events) == 0


worker/change_data_ee.go, line 185 at r1 (raw file):

					// We can set the read ts here only.
					if x.WorkerConfig.LudicrousMode {
						if err := sendEvents(nil, events); err != nil {

better to create a batch of events, and then send after you say have a batch of 1000 events.


worker/change_data_ee.go, line 186 at r1 (raw file):

					if x.WorkerConfig.LudicrousMode {
						if err := sendEvents(nil, events); err != nil {
							return

always log the error, or return the error. Don't silently ignore.


worker/change_data_ee.go, line 189 at r1 (raw file):

						}
						cd.cdcIndex = entry.Index
						cd.updateMinReadTs(proposal.Mutations.StartTs)

As you're sending the events via a batch system or something, you use a send nested function to send these events. That's the right place to capture the timestamp that you have sent. Also, you should capture it only after it has been successfully sent.

If you're unable to send, retry indefinitely.


worker/change_data_ee.go, line 192 at r1 (raw file):

						continue
					}
					if cd.pendingEvents[proposal.Mutations.StartTs] == nil {

no need to do this if.


worker/change_data_ee.go, line 196 at r1 (raw file):

					}
					cd.pendingEvents[proposal.Mutations.StartTs] =
						append(cd.pendingEvents[proposal.Mutations.StartTs], events...)

Append can take nils.


worker/change_data_ee.go, line 201 at r1 (raw file):

				if proposal.Delta != nil {
					for _, ts := range proposal.Delta.Txns {
						cd.maxReadTs = x.Max(cd.maxReadTs, ts.StartTs)

worker.MaxAssigned?


worker/change_data_ee.go, line 204 at r1 (raw file):

						pending := cd.pendingEvents[ts.StartTs]
						if ts.CommitTs > 0 && len(pending) > 0 {
							if err := sendEvents(ts, pending); err != nil {

you can do batching in sendEvents. Always flush at the end of each run.


worker/change_data_ee.go, line 210 at r1 (raw file):

						// delete from pending events once events are sent
						delete(cd.pendingEvents, ts.StartTs)
						_ = cd.evaluateAndSetMinReadTs()

this should be done via the sendEvents.

Copy link
Contributor Author

@aman-bansal aman-bansal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 16 files reviewed, 25 unresolved discussions (waiting on @aman-bansal, @manishrjain, and @vvbalaji-dgraph)


dgraph/cmd/alpha/run.go, line 206 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

can be called cdc. In the description you can call it change data capture.

sink should just part of this (no need to call anything sink)

kafka is an option and so on.
file is an option.

Done.


dgraph/cmd/alpha/run.go, line 209 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

switch all underscores to dashes.

Done.


dgraph/cmd/alpha/run.go, line 214 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Various sink options for Change Data Capture.

Done.


dgraph/cmd/alpha/run.go, line 215 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Use kafka as the key.
Use file as the key.

You can have as many keys as you want. So, no need to have to use a "destination" key for everything.

kafka=host1,host2
file=...

Done.


dgraph/cmd/alpha/run.go, line 216 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

sasl-user=

Done.


protos/pb.proto, line 321 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

CDC_ts

Done.


systest/change_data/change_data_test.go, line 2 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Done.


worker/change_data.go, line 4 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

instead of calling it change_data. Just call it cdc.go.

Done.


worker/change_data_ee.go, line 4 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

cdc_ee.go

Done.


worker/change_data_ee.go, line 34 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

type CDC struct {

Done.


worker/change_data_ee.go, line 45 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Sounds like it's a new object: newCDC

Done.


worker/change_data_ee.go, line 113 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

use dashes everywhere.

Done.S


worker/change_data_ee.go, line 160 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

cdc.Index

Done.


worker/change_data_ee.go, line 175 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

toCDCEvent

Done.


worker/change_data_ee.go, line 176 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

if len(events) == 0

Done.


worker/change_data_ee.go, line 186 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

always log the error, or return the error. Don't silently ignore.

Done.


worker/change_data_ee.go, line 192 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

no need to do this if.

Done.


worker/change_data_ee.go, line 196 at r1 (raw file):

Previously, manishrjain (Manish R Jain) wrote…

Append can take nils.

Done.

@aman-bansal aman-bansal force-pushed the aman/cdc_integration branch 3 times, most recently from 687f9ff to 3e76dbe Compare February 5, 2021 12:54
Copy link
Contributor

@jarifibrahim jarifibrahim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: Fix the commented out code.

Dismissed @manishrjain from a discussion.
Reviewable status: 0 of 16 files reviewed, 25 unresolved discussions (waiting on @aman-bansal, @manishrjain, and @vvbalaji-dgraph)


dgraph/cmd/alpha/run.go, line 214 at r1 (raw file):

Previously, aman-bansal (aman bansal) wrote…

Done.

nit: missing period.


worker/sink_handler.go, line 92 at r4 (raw file):

		caFile, err := ioutil.ReadFile(config.GetString("ca-cert"))
		if err != nil {
			return nil, err

wrap the errors

Copy link
Contributor Author

@aman-bansal aman-bansal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 16 files reviewed, 25 unresolved discussions (waiting on @aman-bansal, @jarifibrahim, @manishrjain, and @vvbalaji-dgraph)


worker/sink_handler.go, line 92 at r4 (raw file):

Previously, jarifibrahim (Ibrahim Jarif) wrote…

wrap the errors

Done.

@aman-bansal aman-bansal changed the title feat(enterpise): Change data capture integration with kafka feat(enterpise): Change data capture (CDC) integration with kafka Feb 9, 2021
@aman-bansal aman-bansal merged commit eb7e5e1 into master Feb 9, 2021
@aman-bansal aman-bansal deleted the aman/cdc_integration branch February 9, 2021 19:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants