Skip to content

Commit

Permalink
Table whitelist config parameter (#7)
Browse files Browse the repository at this point in the history
integration tests

- updated kinesis adapter
- service endpoints config
- fixed credential config types

updated kinesis-adapter

use gson only

integration test task

test switch from init sync

table whitelist config parameter

kcl.table.billing.mode config parameter

add aws-java-sdk-sts dependency

#5

params

cleaning

cleanup

docs
  • Loading branch information
raimondast authored Nov 19, 2020
1 parent d319c32 commit c35e7a5
Show file tree
Hide file tree
Showing 24 changed files with 953 additions and 134 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which im
## Notable features
* `autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)
* `initial sync` - automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table stream

* `local debugging` - use of test containers to test full connector life-cycle
## Alternatives

Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.
Expand All @@ -22,7 +22,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
* Gradlew 5.3.1
* Kafka Connect Framework >= 2.1.1
* Amazon Kinesis Client 1.9.1
* DynamoDB Streams Kinesis Adapter 1.4.0
* DynamoDB Streams Kinesis Adapter 1.5.2

## Documentation
* [Getting started](docs/getting-started.md)
Expand Down Expand Up @@ -97,6 +97,9 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
# build & run unit tests
./gradlew

# run integration tests
./gradlew integrationTests

# build final jar
./gradlew shadowJar
```
Expand Down Expand Up @@ -128,7 +131,6 @@ Releases are done by creating new release(aka tag) via Github user interface. On

## Roadmap (TODO: move to issues?)

* Add Confluent stack as docker-compose.yml for easier local debugging
* Use multithreaded DynamoDB table scan for faster `INIT SYNC`


Expand Down
34 changes: 32 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,27 @@ allprojects {


dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.4.1')
testCompile ("org.junit.jupiter:junit-jupiter-params:5.3.2")
def junitJupiterVersion = '5.6.2'
testImplementation "org.junit.jupiter:junit-jupiter:$junitJupiterVersion"
testCompile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion"
testCompile "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion"
implementation 'io.rest-assured:rest-assured:4.3.1'
testCompile "org.testcontainers:testcontainers:1.14.3"
testCompile "org.testcontainers:junit-jupiter:1.14.3"
testCompile "org.testcontainers:kafka:1.15.0-rc2"
testCompile "org.testcontainers:mockserver:1.15.0-rc2"
testCompile "org.mock-server:mockserver-client-java:5.11.1"
testCompile "com.google.code.gson:gson:2.8.6"

testCompile group: 'org.mockito', name: 'mockito-junit-jupiter', version: '2.26.0'
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.2'
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.2'
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.11.2'
}

test {
exclude '**/**IT**'
useJUnitPlatform()
testLogging {
outputs.upToDateWhen {false}
Expand All @@ -66,8 +78,26 @@ allprojects {
}
}
}

task integrationTests(type: Test) {
dependsOn shadowJar
useJUnitPlatform()
include '**/**IT**'
testLogging {
outputs.upToDateWhen {false}
events = ["passed", "failed", "skipped"]
showStandardStreams = true
afterSuite { desc, result ->
if (!desc.parent) { // will match the outermost suite
println "Results: ${result.resultType} (${result.testCount} tests, ${result.successfulTestCount} successes, ${result.failedTestCount} failures, ${result.skippedTestCount} skipped)"
}
}
}
}
}



dependencies {
compile project(':source')
}
Expand Down
4 changes: 2 additions & 2 deletions docs/details.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ This connector can sync multiple DynamoDB tables at the same time and it does so
* environment TAG key and value set
* DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)


> Note: if `dynamodb.table.whitelist` parameter is set, then auto-discovery will not be executed and replication will be issued for explicitly defined tables.
### 2. "INIT_SYNC"

`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only).
Expand Down Expand Up @@ -40,7 +40,7 @@ Since we are using two different frameworks/libraries together there are two dif
### `DISCOVERY` state and task configuration

Connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements.
If `dynamodb.table.whitelist` parameter is not defined connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements.

`discovery` phase is executed on start and every 60 seconds(default config value) after initial start.

Expand Down
12 changes: 11 additions & 1 deletion docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
"dynamodb.table.env.tag.key": "environment",
"dynamodb.table.env.tag.value": "dev",
"dynamodb.table.ingestion.tag.key": "datalake-ingest",
"dynamodb.table.whitelist": "",
"dynamodb.service.endpoint": "",

"kcl.table.billing.mode": "PROVISIONED",

"resource.tagging.service.endpoint": "",

"kafka.topic.prefix": "dynamodb-",
"tasks.max": "1",

Expand All @@ -44,8 +50,12 @@

`init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).

`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.
`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.

`dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set.

`resource.tagging.service.endpoint` - AWS Resource Group Tag API Endpoint. Will use default AWS if not set.

`kcl.table.billing.mode` - Define billing mode for internal table created by the KCL library. Default is provisioned.

`dynamodb.table.whitelist` - Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag.
3 changes: 2 additions & 1 deletion source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ dependencies {

compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.4.0'
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.2'
compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.877'
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.resourcegroupstaggingapi.AWSResourceGroupsTaggingAPI;
import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider;
import com.trustpilot.connector.dynamodb.aws.AwsClients;
import com.trustpilot.connector.dynamodb.aws.ConfigTablesProvider;
import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider;
import com.trustpilot.connector.dynamodb.aws.TablesProvider;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectorContext;
Expand Down Expand Up @@ -54,20 +55,26 @@ public void start(Map<String, String> properties) {

AWSResourceGroupsTaggingAPI groupsTaggingAPIClient =
AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(),
config.getAwsAccessKeyId(),
config.getAwsSecretKey());
config.getResourceTaggingServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());

AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(),
config.getAwsAccessKeyId(),
config.getAwsSecretKey());
config.getDynamoDBServiceEndpoint(),
config.getAwsAccessKeyIdValue(),
config.getAwsSecretKeyValue());

if (tablesProvider == null) {
tablesProvider = new DynamoDBTablesProvider(
groupsTaggingAPIClient,
dynamoDBClient,
config.getSrcDynamoDBIngestionTagKey(),
config.getSrcDynamoDBEnvTagKey(),
config.getSrcDynamoDBEnvTagValue());
if (config.getWhitelistTables() != null) {
tablesProvider = new ConfigTablesProvider(dynamoDBClient, config);
} else {
tablesProvider = new DynamoDBTablesProvider(
groupsTaggingAPIClient,
dynamoDBClient,
config.getSrcDynamoDBIngestionTagKey(),
config.getSrcDynamoDBEnvTagKey(),
config.getSrcDynamoDBEnvTagValue());
}
}

startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.trustpilot.connector.dynamodb;

import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;

import java.util.List;
import java.util.Map;

public class DynamoDBSourceConnectorConfig extends AbstractConfig {
Expand All @@ -22,12 +25,12 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
public static final String AWS_ACCESS_KEY_ID_DOC = "Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.";
public static final String AWS_ACCESS_KEY_ID_DISPLAY = "Access key id";
public static final Object AWS_ACCESS_KEY_ID_DEFAULT = null;
public static final Password AWS_ACCESS_KEY_ID_DEFAULT = null;

public static final String AWS_SECRET_KEY_CONFIG = "aws.secret.key";
public static final String AWS_SECRET_KEY_DOC = "Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.";
public static final String AWS_SECRET_KEY_DISPLAY = "Secret key";
public static final Object AWS_SECRET_KEY_DEFAULT = null;
public static final Password AWS_SECRET_KEY_DEFAULT = null;

public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "dynamodb.table.ingestion.tag.key";
public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_DOC = "Define DynamoDB table tag name. Only tables with this tag key will be ingested.";
Expand All @@ -44,6 +47,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY = "Environment";
public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DEFAULT = "dev";

public static final String SRC_DYNAMODB_TABLE_WHITELIST_CONFIG = "dynamodb.table.whitelist";
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DOC = "Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag.";
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist";
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null;

public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode";
public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned.";
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";

public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
Expand All @@ -55,6 +68,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String REDISCOVERY_PERIOD_DISPLAY = "Rediscovery period";
public static final long REDISCOVERY_PERIOD_DEFAULT = 1 * 60 * 1000; // 1 minute

public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG = "resource.tagging.service.endpoint";
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC = "AWS Resource Group Tag API Endpoint. Will use default AWS if not set.";
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY = "AWS Resource Group Tag API Endpoint";
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT = null;

public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG = "dynamodb.service.endpoint";
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DOC = "AWS DynamoDB API Endpoint. Will use default AWS if not set.";
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY = "AWS DynamoDB API Endpoint";
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT = null;

static final ConfigDef config = baseConfigDef();

public DynamoDBSourceConnectorConfig(Map<String, String> props) {
Expand Down Expand Up @@ -122,6 +145,42 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY)

.define(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG,
ConfigDef.Type.STRING,
AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT,
ConfigDef.Importance.LOW,
AWS_DYNAMODB_SERVICE_ENDPOINT_DOC,
AWS_GROUP, 7,
ConfigDef.Width.MEDIUM,
AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY)

.define(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG,
ConfigDef.Type.STRING,
AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT,
ConfigDef.Importance.LOW,
AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC,
AWS_GROUP, 8,
ConfigDef.Width.MEDIUM,
AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY)

.define(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG,
ConfigDef.Type.LIST,
SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT,
ConfigDef.Importance.LOW,
SRC_DYNAMODB_TABLE_WHITELIST_DOC,
AWS_GROUP, 8,
ConfigDef.Width.MEDIUM,
SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY)

.define(SRC_KCL_TABLE_BILLING_MODE_CONFIG,
ConfigDef.Type.STRING,
SRC_KCL_TABLE_BILLING_MODE_DEFAULT,
ConfigDef.Importance.LOW,
SRC_KCL_TABLE_BILLING_MODE_DOC,
AWS_GROUP, 9,
ConfigDef.Width.MEDIUM,
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)

.define(DST_TOPIC_PREFIX_CONFIG,
ConfigDef.Type.STRING,
DST_TOPIC_PREFIX_DEFAULT,
Expand All @@ -148,8 +207,8 @@ public static ConfigDef baseConfigDef() {
CONNECTOR_GROUP, 4,
ConfigDef.Width.MEDIUM,
REDISCOVERY_PERIOD_DISPLAY)

;

}

public static void main(String[] args) {
Expand All @@ -160,12 +219,20 @@ public String getAwsRegion() {
return getString(AWS_REGION_CONFIG);
}

public String getAwsAccessKeyId() {
return getString(AWS_ACCESS_KEY_ID_CONFIG);
public Password getAwsAccessKeyId() {
return getPassword(AWS_ACCESS_KEY_ID_CONFIG);
}

public String getAwsAccessKeyIdValue() {
return getPassword(AWS_ACCESS_KEY_ID_CONFIG) == null ? null : getPassword(AWS_ACCESS_KEY_ID_CONFIG).value();
}

public Password getAwsSecretKey() {
return getPassword(AWS_SECRET_KEY_CONFIG);
}

public String getAwsSecretKey() {
return getString(AWS_SECRET_KEY_CONFIG);
public String getAwsSecretKeyValue() {
return getPassword(AWS_SECRET_KEY_CONFIG) == null ? null : getPassword(AWS_SECRET_KEY_CONFIG).value();
}

public String getSrcDynamoDBIngestionTagKey() {
Expand All @@ -189,4 +256,20 @@ public long getRediscoveryPeriod() {
public int getInitSyncDelay() {
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
}

public String getDynamoDBServiceEndpoint() {
return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG);
}

public String getResourceTaggingServiceEndpoint() {
return getString(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG);
}

public List<String> getWhitelistTables() {
return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null;
}

public BillingMode getKCLTableBillingMode() {
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
}
}
Loading

0 comments on commit c35e7a5

Please sign in to comment.