Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/init sync #11

Merged
merged 7 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist: focal
language: java

before_cache:
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
* Java 8
* Gradlew 5.3.1
* Kafka Connect Framework >= 2.1.1
* Amazon Kinesis Client 1.9.1
* DynamoDB Streams Kinesis Adapter 1.5.2
* Amazon Kinesis Client 1.13.1
* DynamoDB Streams Kinesis Adapter 1.5.3

## Documentation
* [Getting started](docs/getting-started.md)
Expand All @@ -44,6 +44,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
* However you will only encounter this issue by running lots of tasks on one machine with really high load.

* Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours.
* `INIT_SYNC` can be skipped with `init.sync.skip=true` configuration

* Required AWS roles:
```json
Expand Down
2 changes: 2 additions & 0 deletions docs/details.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ This connector can sync multiple DynamoDB tables at the same time and it does so

`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only).

Using `init.sync.skip` will skip this process and the connector will only ever read from the LATEST position in the stream.

### 3. "SYNC"

Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time.
Expand Down
3 changes: 3 additions & 0 deletions docs/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"tasks.max": "1",

"init.sync.delay.period": 60,
"init.sync.skip": false,
"connect.dynamodb.rediscovery.period": "60000"
}
```
Expand All @@ -53,6 +54,8 @@

`init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).

`init.sync.skip` - boolean to determine whether to start the connector reading the entire table or from the latest offset.

`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.

`dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set.
Expand Down
4 changes: 2 additions & 2 deletions source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551'

compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.2'
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.13.3'
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.3'
compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.877'
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay";
public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60;

public static final String SRC_INIT_SYNC_SKIP_CONFIG = "init.sync.skip";
public static final String SRC_INIT_SYNC_SKIP_DOC = "Define whether to skip INIT_SYNC of table.";
public static final String SRC_INIT_SYNC_SKIP_DISPLAY = "Skip INIT_SYNC";
public static final boolean SRC_INIT_SYNC_SKIP_DEFAULT = false;

public static final String AWS_REGION_CONFIG = "aws.region";
public static final String AWS_REGION_DOC = "Define AWS region.";
public static final String AWS_REGION_DISPLAY = "Region";
Expand Down Expand Up @@ -204,12 +209,21 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Width.MEDIUM,
DST_TOPIC_PREFIX_DISPLAY)

.define(SRC_INIT_SYNC_SKIP_CONFIG,
ConfigDef.Type.BOOLEAN,
SRC_INIT_SYNC_SKIP_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_SKIP_DOC,
CONNECTOR_GROUP, 2,
ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_SKIP_DISPLAY)

.define(SRC_INIT_SYNC_DELAY_CONFIG,
ConfigDef.Type.INT,
SRC_INIT_SYNC_DELAY_DEFAULT,
ConfigDef.Importance.LOW,
SRC_INIT_SYNC_DELAY_DOC,
CONNECTOR_GROUP, 2,
CONNECTOR_GROUP, 3,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is 'CONNECTOR_GROUP' ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is used to display configs in the UI on something like Confluent Hub (width, grouping, importance). No real use outside of that.

ConfigDef.Width.MEDIUM,
SRC_INIT_SYNC_DELAY_DISPLAY)

Expand Down Expand Up @@ -267,6 +281,10 @@ public long getRediscoveryPeriod() {
return getLong(REDISCOVERY_PERIOD_CONFIG);
}

public boolean getInitSyncSkip() {
return (boolean)get(SRC_INIT_SYNC_SKIP_CONFIG);
}

public int getInitSyncDelay() {
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask {
private SourceInfo sourceInfo;
private TableDescription tableDesc;
private int initSyncDelay;
private boolean initSyncSkip;

@SuppressWarnings("unused")
//Used by Confluent platform to initialize connector
Expand Down Expand Up @@ -124,6 +125,7 @@ public void start(Map<String, String> configProperties) {
tableDesc = client.describeTable(config.getTableName()).getTable();

initSyncDelay = config.getInitSyncDelay();
initSyncSkip = config.getInitSyncSkip();

LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName());
setStateFromOffset();
Expand Down Expand Up @@ -152,7 +154,14 @@ public void start(Map<String, String> configProperties) {
eventsQueue,
shardRegister);
}
kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode());
kclWorker.start(client,
dynamoDBStreamsClient,
tableDesc.getTableName(),
config.getTaskID(),
config.getDynamoDBServiceEndpoint(),
config.getInitSyncSkip(),
config.getKCLTableBillingMode()
);

shutdown = false;
}
Expand All @@ -162,10 +171,13 @@ private void setStateFromOffset() {
.offset(Collections.singletonMap("table_name", tableDesc.getTableName()));
if (offset != null) {
sourceInfo = SourceInfo.fromOffset(offset, clock);
if (initSyncSkip) {
sourceInfo.skipInitSync();
}
} else {
LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName());
sourceInfo = new SourceInfo(tableDesc.getTableName(), clock);
sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table
sourceInfo.startInitSync();
}
}

Expand Down Expand Up @@ -196,6 +208,9 @@ public List<SourceRecord> poll() throws InterruptedException {
if (sourceInfo.initSyncStatus == InitSyncStatus.FINISHED) {
return sync();
}
if (sourceInfo.initSyncStatus == InitSyncStatus.SKIPPED) {
return sync();
}
throw new Exception("Invalid SourceInfo InitSyncStatus state: " + sourceInfo.initSyncStatus);
} catch (InterruptedException ex) {
LOGGER.error("Failed to handle incoming records. Records dropped!", ex);
Expand Down Expand Up @@ -279,6 +294,7 @@ private List<SourceRecord> sync() throws Exception {
LOGGER.debug("Waiting for records from eventsQueue for table: {}", tableDesc.getTableName());
KclRecordsWrapper dynamoDBRecords = eventsQueue.poll(500, TimeUnit.MILLISECONDS);
if (dynamoDBRecords == null) {
LOGGER.debug("null dynamoDBRecords");
return null; // returning thread control at regular intervals
}

Expand Down Expand Up @@ -312,12 +328,12 @@ private List<SourceRecord> sync() throws Exception {
}

// Received record which is behind "safe" zone. Indicating that "potentially" we lost some records.
// Need to resync...
// Need to resync if sync hasn't been skipped...
// This happens if:
// * connector was down for some time
// * connector is lagging
// * connector failed to finish init sync in acceptable time frame
if (recordIsInDangerZone(arrivalTimestamp)) {
if (recordIsInDangerZone(arrivalTimestamp) && sourceInfo.initSyncStatus != InitSyncStatus.SKIPPED) {
sourceInfo.startInitSync();

LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
public enum InitSyncStatus {
UNDEFINED,
RUNNING,
FINISHED
FINISHED,
SKIPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public void endInitSync() {
lastInitSyncEnd = Instant.now(clock);
}

public void skipInitSync() {
initSyncStatus = InitSyncStatus.SKIPPED;
lastInitSyncStart = Instant.ofEpochSecond(0);
lastInitSyncEnd = Instant.ofEpochSecond(0);;
exclusiveStartKey = null;
initSyncCount = 0L;
}

private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.name(SchemaNameAdjuster
.defaultAdjuster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public GetMetricStatisticsResult getMetricStatistics(GetMetricStatisticsRequest
return null;
}

@Override
public GetMetricStreamResult getMetricStream(GetMetricStreamRequest getMetricStreamRequest) {
return null;
}

@Override
public GetMetricWidgetImageResult getMetricWidgetImage(GetMetricWidgetImageRequest getMetricWidgetImageRequest) {
return null;
Expand All @@ -119,6 +124,11 @@ public PutAnomalyDetectorResult putAnomalyDetector(PutAnomalyDetectorRequest put
return null;
}

@Override
public PutCompositeAlarmResult putCompositeAlarm(PutCompositeAlarmRequest putCompositeAlarmRequest) {
return null;
}

@Override
public PutMetricAlarmResult putMetricAlarm(PutMetricAlarmRequest putMetricAlarmRequest) {
return null;
Expand All @@ -129,11 +139,26 @@ public PutMetricDataResult putMetricData(PutMetricDataRequest putMetricDataReque
return null;
}

@Override
public PutMetricStreamResult putMetricStream(PutMetricStreamRequest putMetricStreamRequest) {
return null;
}

@Override
public SetAlarmStateResult setAlarmState(SetAlarmStateRequest setAlarmStateRequest) {
return null;
}

@Override
public StartMetricStreamsResult startMetricStreams(StartMetricStreamsRequest startMetricStreamsRequest) {
return null;
}

@Override
public StopMetricStreamsResult stopMetricStreams(StopMetricStreamsRequest stopMetricStreamsRequest) {
return null;
}

@Override
public TagResourceResult tagResource(TagResourceRequest tagResourceRequest) {
return null;
Expand Down Expand Up @@ -169,6 +194,11 @@ public DeleteInsightRulesResult deleteInsightRules(DeleteInsightRulesRequest del
return null;
}

@Override
public DeleteMetricStreamResult deleteMetricStream(DeleteMetricStreamRequest deleteMetricStreamRequest) {
return null;
}

@Override
public GetDashboardResult getDashboard(GetDashboardRequest getDashboardRequest) {
return null;
Expand All @@ -189,6 +219,11 @@ public ListDashboardsResult listDashboards(ListDashboardsRequest listDashboardsR
return null;
}

@Override
public ListMetricStreamsResult listMetricStreams(ListMetricStreamsRequest listMetricStreamsRequest) {
return null;
}

@Override
public PutDashboardResult putDashboard(PutDashboardRequest putDashboardRequest) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ void start(AmazonDynamoDB dynamoDBClient,
String tableName,
String taskid,
String endpoint,
Boolean isSkipSync,
BillingMode kclTablebillingMode);

void stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class KclWorkerImpl implements KclWorker {
private final AWSCredentialsProvider awsCredentialsProvider;
private final ArrayBlockingQueue<KclRecordsWrapper> eventsQueue;
private final ConcurrentHashMap<String, ShardInfo> recordProcessorsRegister;

private volatile Thread thread;
private volatile Worker worker;

Expand All @@ -49,13 +48,14 @@ public void start(AmazonDynamoDB dynamoDBClient,
String tableName,
String taskid,
String endpoint,
Boolean isSkipSync,
BillingMode kclTableBillingMode) {
IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue,
recordProcessorsRegister);

KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName,
taskid,
dynamoDBClient, endpoint, kclTableBillingMode);
dynamoDBClient, endpoint, isSkipSync, kclTableBillingMode);

AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);

Expand Down Expand Up @@ -123,8 +123,15 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName,
String taskid,
AmazonDynamoDB dynamoDBClient,
String endpoint,
Boolean isSkipSync,
BillingMode kclTableBillingMode) {

InitialPositionInStream initialPosition;
if (isSkipSync) {
initialPosition = InitialPositionInStream.LATEST;
} else {
initialPosition = InitialPositionInStream.TRIM_HORIZON;
}
String streamArn = dynamoDBClient.describeTable(
new DescribeTableRequest()
.withTableName(tableName)).getTable().getLatestStreamArn();
Expand All @@ -141,7 +148,7 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName,

// worker will use checkpoint tableName if available, otherwise it is safer
// to start at beginning of the stream
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
.withInitialPositionInStream(initialPosition)

// we want the maximum batch size to avoid network transfer latency overhead
.withMaxRecords(Constants.STREAMS_RECORDS_LIMIT)
Expand Down
Loading