Skip to content

Commit

Permalink
Documentation review
Browse files Browse the repository at this point in the history
  • Loading branch information
TPRobots authored and Egidijus Bartkus committed Jul 2, 2019
1 parent 0cd4edb commit 97726b7
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 30 deletions.
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

# kafka-connect-dynamodb

A plug-in for the [Kafka Connect framework](https://kafka.apache.org/documentation.html#connect) which implements a "source connector" for DynamoDB table Streams. This source connector extends [Confluent ecosystem](https://www.confluent.io/hub/) and allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Confluent sink connectors to push this data to different destinations systems for e.g. into BigQuery for easy analytics.
A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which implements a "source connector" for AWS DynamoDB table Streams. This source connector allows replicating DynamoDB tables into Kafka topics. Once data is in Kafka you can use various Kafka sink connectors to push this data to different destinations systems, e.g. - BigQuery for easy analytics.

## Additional features
## Notable features
* `autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)
* `INIT_SYNC` - automatically detects and if needed performs initial(existing) data replication before continuing tracking changes from the Stream
* `initial sync` - automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table stream

## Alternatives

We found only one existing alternative implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features and is no longer supported.
Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.

Also it tries to manage DynamoDB Stream shards manually by using one Connect task to read from each DynamoDB Streams shard, but since DynamoDB Stream shards are dynamic compared to static ones in Kinesis streams this approach would require rebalancing all Confluent Connect cluster tasks far to often.
Also it tries to manage DynamoDB Stream shards manually by using one Kafka Connect task to read from each DynamoDB Streams shard, but since DynamoDB Stream shards are dynamic compared to static ones in Kinesis streams this approach would require rebalancing all Kafka Connect cluster tasks far to often.

Contrary in our implementation we opted to use Amazon Kinesis Client with DynamoDB Streams Kinesis Adapter which takes care of all shard reading and tracking tasks.

Expand All @@ -22,7 +22,7 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo

* Java 8
* Gradlew 5.3.1
* Confluent Kafka Connect Framework >= 2.1.1
* Kafka Connect Framework >= 2.1.1
* Amazon Kinesis Client 1.9.1
* DynamoDB Streams Kinesis Adapter 1.4.0

Expand All @@ -36,16 +36,16 @@ Contrary in our implementation we opted to use Amazon Kinesis Client with Dynamo

* KCL(Amazon Kinesis Client) keeps metadata in separate dedicated DynamoDB table for each DynamoDB Stream it's tracking. Meaning that there will be one additional table created for each table this connector is tracking.

* Current implementation supports only one Confluent task(= KCL worker) reading from one table at any given time.
* This limits maximum throughput possible from one table to about **~2000 records(change events) per second**.
* This limitation is imposed by current plugin logic and not by the KCL library or Kafka connect framework. Running multiple tasks would require additional synchronization mechanisms for `INIT SYNC` state tracking and might be implemented in feature.
* Current implementation supports only one Kafka Connect task(= KCL worker) reading from one table at any given time.
* Due to this limitation we tested maximum throughput from one table to be **~2000 records(change events) per second**.
* This limitation is imposed by current connector logic and not by the KCL library or Kafka Connect framework. Running multiple tasks would require additional synchronization mechanisms for `INIT SYNC` state tracking and might be implemented incremental feature.

* Running multiple tasks for different tables on the same JVM has negative impact on overall performance of both tasks.
* This is so because Amazon Kinesis Client library has some global locking happening.
* This is because Amazon Kinesis Client library has some global locking happening.
* This issue has been solved in newer KCL versions, but reading from DynamoDB Streams requires usage of DynamoDB Streams Kinesis Adapter library. And this library still depends on older Amazon Kinesis Client 1.9.1.
* That being said you will only encounter this issue by running lots of tasks on one machine with really high load.

* DynamoDB table unit capacity must be large enough to enable `INIT_SYNC` to finished in around 16 hours. Otherwise you risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
* DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.

* Required AWS roles:
```json
Expand Down Expand Up @@ -126,7 +126,7 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,

## Releases

Releases are done by creating new release(aka tag) via Github user interface. Once created Travis will pick it up, build and upload final .jar file as asset for the Github release.
Releases are done by creating new release(aka tag) via Github user interface. Once created Travis CI will pick it up, build and upload final .jar file as asset for the Github release.

## Roadmap (TODO: move to issues?)

Expand Down
4 changes: 2 additions & 2 deletions docs/data.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Topic messages structure

* `Topic key` - contains all defined DynamoDB table keys.
* `Topic value` - contains ful DynamoDB document serialised as DynamoDB Json string together with additional metadata.
* `Topic value` - contains full DynamoDB document serialised as DynamoDB Json string together with additional metadata.

```json
[
Expand Down Expand Up @@ -42,7 +42,7 @@
* `u` - existing record updated
* `d` - existing record deleted

`init_sync_start` - is set then `INIT_SYNC` starts and will retain same value not only for `INIT_SYNC` records but for all following events as well. Untill next `INIT_SYNC` events happens.
`init_sync_start` - is set when `INIT_SYNC` starts and will retain same value not only for `INIT_SYNC` records but for all following events as well. Untill next `INIT_SYNC` events happens.

## Delete records

Expand Down
24 changes: 12 additions & 12 deletions docs/details.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,44 @@

### 1. Source tables discovery & Source tasks configuration

This plugin can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular intervals after that(by default 60s) it queries AWS api for tables which match following criteria and starts Kafka Connect task for each of them:
This connector can sync multiple DynamoDB tables at the same time and it does so without requiring explicit configuration for each one. On start and at regular time intervals (by default 60s) it queries AWS api for DynamoDB tables which match following criteria and starts Kafka Connect task for each of them:
* table must have configured ingestion TAG key set
* table mush have configured stack(environment) TAG key and value set
* table must have DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)


### 2. "INIT_SYNC"

`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the thirst time. But it can be repeated in case of unexpected issues. For e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (as data is stored only for 24 hours).
`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (as data is stored only for 24 hours).

### 3. "SYNC"

Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes happening to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.
Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.

# How does it work

This plugin depends on Kafka Connect framework for most tasks related to Kafka and uses Kinesis Client Library(KCL) + DynamoDB Streams Adapter libraries for DynamoDB Streams consumption.
This connector depends on Kafka Connect framework for most tasks related to Kafka and uses Kinesis Client Library(KCL) + DynamoDB Streams Adapter libraries for DynamoDB Streams consumption.

Read the following articles to familiarize yourself with them:
* [Connector Developer Guide](https://docs.confluent.io/current/connect/devguide.html)
* [KCL](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html)
* [KCL DynamoDB adapter](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html)

At it's core this plugin starts one Kafka Connect task for each table it syncs. And each task starts a dedicated KCL(Kinesis Consumer Library) worker to read data from the stream.
At it's core this connector starts one Kafka Connect task for each table it syncs. And each task starts a dedicated KCL(Kinesis Consumer Library) worker to read data from the stream.

## State tracking

Connector tracks it's state at all stages and is able to continue there it stopped after restart. But, state and progress **tracking happens at regular intervals** and not after each processed event. Meaning that there can and **will be event duplicates in destination topic**!
Connector tracks it's state at all stages and is able to continue where it stopped after restart. However state and progress **tracking happens at regular intervals** and not after each processed event, meaning that there can and **will be event duplicates in destination topic**!

Since we are using two different frameworks/libraries together there are two different ways how each of them stores state:
* Kafka connect leverages dedicated `state` topics there connector tasks can push offsets(state) for each partition they are consuming. This plugin has no support for source table "partitions" and only one task is allowed to consume one table at a time therefor it uses table name as partition key and leverage `offsets` dictionary to store tasks state and progress of that state.
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. It is used only to track which messages have been consumed already. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this plugin.
* Kafka connect leverages dedicated `state` topics where connector tasks can push offsets(state) for each partition they are consuming. This connector has no support for source table "partitions" and only one task is allowed to consume one table at a time, therefore it uses table name as partition key and leverage `offsets` dictionary to store tasks state and progress of that state.
* KCL library uses separate dedicated DynamoDB table for each DynamoDB Stream it tracks to remember it's own progress. It is used only to track which messages have been consumed already. Since we can only say that message has been consumed once it's delivered to Kafka special synchronization logic is implemented in this connector.

> NOTE: KCL library separate `state` table in DynamoDB for each stream it tracks! This table is created automatically if it doesn't exist.
> NOTE: KCL library uses `state` table in DynamoDB for each stream it tracks! This table is created automatically if it doesn't exist.
### `DISCOVERY` and task configuration

Plugin uses resource group api to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is set and streams are actually enabled. For each table which meats all requirements separate dedicated Kafka Connect task is started.
Connector uses resource group api to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if stack TAG is set and streams are actually enabled. For each table which meats all requirements separate dedicated Kafka Connect task is started.

Same `discovery` phase is executed on start and after every 60 seconds(default config value). Each started task can be in one of the following states.

Expand All @@ -53,8 +53,8 @@ During `INIT_SYNC` phase all records from source table are scanned in batches. A
### `SYNC` state

After `INIT_SYNC` plugin starts reading messages from DynamoDB Stream. Thirst thing it makes sure is to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.
After `INIT_SYNC` connector starts reading messages from DynamoDB Stream. First it makes sure to drop all events which happened before `INIT_SYNC` was started (except for those created during last hour before `INIT_SYNC`). This is done to prevent unnecessary duplicate events(since we already have latest state) and to advance KCL reader into `save zone`.

Events are considered to be in `save zone` if they there create no earlier then -20 hours before `now`. Otherwise plugin has no way to validate that it hasn't skipped some of the events and it has to initiate `INIT_SYNC`!
Events are considered to be in `save zone` if they there create no earlier then -20 hours before `now`. Otherwise connector has no way to validate that it hasn't skipped some of the events and it has to initiate `INIT_SYNC`!

> NOTE: DynamoDB Streams store data for 24hours only
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ These instructions will get you running this connector on your own machine. Chec

### Running connector

* First we need to make some configuration change to make connector package available to Kafka Connect:
* First we need to perform some configuration changes to make connector package available to Kafka Connect:

* Store downloaded connector jar file to a location in your filesystem. For instance: `/opt/connectors/my-dynamodb-connector`
* Edit file `${CONFLUENT_PLATFORM_HOME}/etc/schema-registry/connect-avro-distributed.properties` and set `plugin.path=/opt/connectors`
Expand Down
4 changes: 2 additions & 2 deletions docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"connect.dynamodb.rediscovery.period": "60000"
}
```
`dynamodb.table.env.tag.key` - tag key used to define environment(stack). Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Confluent Connect clusters to sync different tables.
`dynamodb.table.env.tag.key` - tag key used to define environment(stack). Useful if you have `staging` and `production` under same AWS account. Or if you want to use different Kafka Connect clusters to sync different tables.

`dynamodb.table.env.tag.value` - defines from which environment or stack to ingest tables. For e.g. 'staging' or 'production'...

Expand All @@ -42,7 +42,7 @@

`tasks.max` - **MUST** always exceed number of tables found for tracking. If max tasks count is lower then found tables count, no tasks will be started!

`init.sync.delay.period` - time value in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Confluent Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).
`init.sync.delay.period` - time value in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).

`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.

Expand Down
2 changes: 1 addition & 1 deletion source/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description = "Kafka Connect Source plugin that reads from DynamoDB streams"
description = "Kafka Connect Source connector that reads from DynamoDB streams"

dependencies {
implementation 'com.google.code.gson:gson:2.8.2'
Expand Down

0 comments on commit 97726b7

Please sign in to comment.