From 50c396cda297faf7ccbfaf3fb19e788d22aabb91 Mon Sep 17 00:00:00 2001 From: Trustpilot Robot User Date: Tue, 21 Jun 2022 16:57:47 +0200 Subject: [PATCH 1/6] Update_package_versions (#21) * update kcl and dynamo packages * add dist property to travis file * override abstract CW methods Co-authored-by: fal-trustpilot <69148060+fal-trustpilot@users.noreply.github.com> --- .travis.yml | 1 + source/build.gradle | 4 +-- .../dynamodb/kcl/KclNoopCloudWatch.java | 35 +++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 306dba1..584e74e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,4 @@ +dist: focal language: java before_cache: diff --git a/source/build.gradle b/source/build.gradle index d8faae6..99c0d0d 100644 --- a/source/build.gradle +++ b/source/build.gradle @@ -5,7 +5,7 @@ dependencies { implementation 'com.amazonaws:aws-java-sdk-resourcegroupstaggingapi:1.11.551' compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}" - compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1' - compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.2' + compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.13.3' + compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.3' compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.877' } \ No newline at end of file diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java index 2c960bf..182fd25 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclNoopCloudWatch.java @@ -94,6 +94,11 @@ public GetMetricStatisticsResult getMetricStatistics(GetMetricStatisticsRequest return null; } + @Override + public GetMetricStreamResult getMetricStream(GetMetricStreamRequest getMetricStreamRequest) { + return null; + } + @Override public GetMetricWidgetImageResult getMetricWidgetImage(GetMetricWidgetImageRequest getMetricWidgetImageRequest) { return null; @@ -119,6 +124,11 @@ public PutAnomalyDetectorResult putAnomalyDetector(PutAnomalyDetectorRequest put return null; } + @Override + public PutCompositeAlarmResult putCompositeAlarm(PutCompositeAlarmRequest putCompositeAlarmRequest) { + return null; + } + @Override public PutMetricAlarmResult putMetricAlarm(PutMetricAlarmRequest putMetricAlarmRequest) { return null; @@ -129,11 +139,26 @@ public PutMetricDataResult putMetricData(PutMetricDataRequest putMetricDataReque return null; } + @Override + public PutMetricStreamResult putMetricStream(PutMetricStreamRequest putMetricStreamRequest) { + return null; + } + @Override public SetAlarmStateResult setAlarmState(SetAlarmStateRequest setAlarmStateRequest) { return null; } + @Override + public StartMetricStreamsResult startMetricStreams(StartMetricStreamsRequest startMetricStreamsRequest) { + return null; + } + + @Override + public StopMetricStreamsResult stopMetricStreams(StopMetricStreamsRequest stopMetricStreamsRequest) { + return null; + } + @Override public TagResourceResult tagResource(TagResourceRequest tagResourceRequest) { return null; @@ -169,6 +194,11 @@ public DeleteInsightRulesResult deleteInsightRules(DeleteInsightRulesRequest del return null; } + @Override + public DeleteMetricStreamResult deleteMetricStream(DeleteMetricStreamRequest deleteMetricStreamRequest) { + return null; + } + @Override public GetDashboardResult getDashboard(GetDashboardRequest getDashboardRequest) { return null; @@ -189,6 +219,11 @@ public ListDashboardsResult listDashboards(ListDashboardsRequest listDashboardsR return null; } + @Override + public ListMetricStreamsResult listMetricStreams(ListMetricStreamsRequest listMetricStreamsRequest) { + return null; + } + @Override public PutDashboardResult putDashboard(PutDashboardRequest putDashboardRequest) { return null; From 9dcbf4211bb78f4c53cce0ad711fb735e08e5dbd Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Tue, 28 Feb 2023 18:04:35 -0600 Subject: [PATCH 2/6] add init sync skip with new status and initial position in stream --- README.md | 5 +- docs/details.md | 2 + docs/options.md | 3 + .../DynamoDBSourceConnectorConfig.java | 20 +++- .../dynamodb/DynamoDBSourceTask.java | 19 +++- .../connector/dynamodb/InitSyncStatus.java | 3 +- .../connector/dynamodb/kcl/KclWorker.java | 1 + .../connector/dynamodb/kcl/KclWorkerImpl.java | 13 ++- .../dynamodb/DynamoDBSourceTaskTests.java | 107 +++++++++++++++++- .../dynamodb/kcl/KclWorkerImplTests.java | 3 +- 10 files changed, 164 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 5fe3307..fe0ef6b 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * Java 8 * Gradlew 5.3.1 * Kafka Connect Framework >= 2.1.1 -* Amazon Kinesis Client 1.9.1 -* DynamoDB Streams Kinesis Adapter 1.5.2 +* Amazon Kinesis Client 1.13.1 +* DynamoDB Streams Kinesis Adapter 1.5.3 ## Documentation * [Getting started](docs/getting-started.md) @@ -44,6 +44,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream * However you will only encounter this issue by running lots of tasks on one machine with really high load. * Synced(Source) DynamoDB table unit capacity must be large enough to ensure `INIT_SYNC` to be finished in around 16 hours. Otherwise there is a risk `INIT_SYNC` being restarted just as soon as it's finished because DynamoDB Streams store change events only for 24 hours. + * `INIT_SYNC` can be skipped with `init.sync.skip=true` configuration * Required AWS roles: ```json diff --git a/docs/details.md b/docs/details.md index c95aafb..8d95c97 100644 --- a/docs/details.md +++ b/docs/details.md @@ -13,6 +13,8 @@ This connector can sync multiple DynamoDB tables at the same time and it does so `INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only). +Using `init.sync.skip` will skip this process and the connector will only ever read from the LATEST position in the stream. + ### 3. "SYNC" Once `INIT_SYNC` is finished source task switches into DynamoDB Streams consumer state. There all changes that happen to the source table are represented in this stream and copied over to the Kafka's destination topic. Consumers of this topic can recreate full state of the source table at any given time. diff --git a/docs/options.md b/docs/options.md index a0c6e4e..4bacf77 100644 --- a/docs/options.md +++ b/docs/options.md @@ -36,6 +36,7 @@ "tasks.max": "1", "init.sync.delay.period": 60, + "init.sync.skip": false, "connect.dynamodb.rediscovery.period": "60000" } ``` @@ -53,6 +54,8 @@ `init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress). +`init.sync.skip` - boolean to determine whether to start the connector reading the entire table or from the latest offset. + `connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured. `dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set. diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java index 86299be..f077930 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java @@ -17,6 +17,11 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig { public static final String SRC_INIT_SYNC_DELAY_DISPLAY = "INIT_SYNC delay"; public static final int SRC_INIT_SYNC_DELAY_DEFAULT = 60; + public static final String SRC_INIT_SYNC_SKIP_CONFIG = "init.sync.skip"; + public static final String SRC_INIT_SYNC_SKIP_DOC = "Define whether to skip INIT_SYNC of table."; + public static final String SRC_INIT_SYNC_SKIP_DISPLAY = "Skip INIT_SYNC"; + public static final boolean SRC_INIT_SYNC_SKIP_DEFAULT = false; + public static final String AWS_REGION_CONFIG = "aws.region"; public static final String AWS_REGION_DOC = "Define AWS region."; public static final String AWS_REGION_DISPLAY = "Region"; @@ -204,12 +209,21 @@ public static ConfigDef baseConfigDef() { ConfigDef.Width.MEDIUM, DST_TOPIC_PREFIX_DISPLAY) + .define(SRC_INIT_SYNC_SKIP_CONFIG, + ConfigDef.Type.BOOLEAN, + SRC_INIT_SYNC_SKIP_DEFAULT, + ConfigDef.Importance.LOW, + SRC_INIT_SYNC_SKIP_DOC, + CONNECTOR_GROUP, 2, + ConfigDef.Width.MEDIUM, + SRC_INIT_SYNC_SKIP_DISPLAY) + .define(SRC_INIT_SYNC_DELAY_CONFIG, ConfigDef.Type.INT, SRC_INIT_SYNC_DELAY_DEFAULT, ConfigDef.Importance.LOW, SRC_INIT_SYNC_DELAY_DOC, - CONNECTOR_GROUP, 2, + CONNECTOR_GROUP, 3, ConfigDef.Width.MEDIUM, SRC_INIT_SYNC_DELAY_DISPLAY) @@ -267,6 +281,10 @@ public long getRediscoveryPeriod() { return getLong(REDISCOVERY_PERIOD_CONFIG); } + public boolean getInitSyncSkip() { + return (boolean)get(SRC_INIT_SYNC_SKIP_CONFIG); + } + public int getInitSyncDelay() { return (int)get(SRC_INIT_SYNC_DELAY_CONFIG); } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 0f53fdc..98729cb 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -152,7 +152,14 @@ public void start(Map configProperties) { eventsQueue, shardRegister); } - kclWorker.start(client, dynamoDBStreamsClient, tableDesc.getTableName(), config.getTaskID(), config.getDynamoDBServiceEndpoint(), config.getKCLTableBillingMode()); + kclWorker.start(client, + dynamoDBStreamsClient, + tableDesc.getTableName(), + config.getTaskID(), + config.getDynamoDBServiceEndpoint(), + config.getInitSyncSkip(), + config.getKCLTableBillingMode() + ); shutdown = false; } @@ -196,6 +203,9 @@ public List poll() throws InterruptedException { if (sourceInfo.initSyncStatus == InitSyncStatus.FINISHED) { return sync(); } + if (sourceInfo.initSyncStatus == InitSyncStatus.SKIPPED) { + return sync(); + } throw new Exception("Invalid SourceInfo InitSyncStatus state: " + sourceInfo.initSyncStatus); } catch (InterruptedException ex) { LOGGER.error("Failed to handle incoming records. Records dropped!", ex); @@ -213,6 +223,8 @@ public List poll() throws InterruptedException { * {@link SourceInfo}. */ private LinkedList initSync() throws Exception { + // TODO: remove log + LOGGER.info("init sync running"); if (sourceInfo.lastInitSyncStart.compareTo(Instant.now(clock).minus(Duration.ofHours(19))) <= 0) { LOGGER.error("Current INIT_SYNC took over 19 hours. Restarting INIT_SYNC! {}", sourceInfo); sourceInfo.startInitSync(); @@ -279,6 +291,7 @@ private List sync() throws Exception { LOGGER.debug("Waiting for records from eventsQueue for table: {}", tableDesc.getTableName()); KclRecordsWrapper dynamoDBRecords = eventsQueue.poll(500, TimeUnit.MILLISECONDS); if (dynamoDBRecords == null) { + LOGGER.debug("null dynamoDBRecords"); return null; // returning thread control at regular intervals } @@ -312,12 +325,12 @@ private List sync() throws Exception { } // Received record which is behind "safe" zone. Indicating that "potentially" we lost some records. - // Need to resync... + // Need to resync if sync hasn't been skipped... // This happens if: // * connector was down for some time // * connector is lagging // * connector failed to finish init sync in acceptable time frame - if (recordIsInDangerZone(arrivalTimestamp)) { + if (recordIsInDangerZone(arrivalTimestamp) && sourceInfo.initSyncStatus != InitSyncStatus.SKIPPED) { sourceInfo.startInitSync(); LOGGER.info( diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java index 2ab048c..be6791a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/InitSyncStatus.java @@ -3,5 +3,6 @@ public enum InitSyncStatus { UNDEFINED, RUNNING, - FINISHED + FINISHED, + SKIPPED } diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java index 27557bb..98026ba 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorker.java @@ -10,6 +10,7 @@ void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTablebillingMode); void stop(); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java index f6bd839..71dcfe4 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImpl.java @@ -30,7 +30,6 @@ public class KclWorkerImpl implements KclWorker { private final AWSCredentialsProvider awsCredentialsProvider; private final ArrayBlockingQueue eventsQueue; private final ConcurrentHashMap recordProcessorsRegister; - private volatile Thread thread; private volatile Worker worker; @@ -49,13 +48,14 @@ public void start(AmazonDynamoDB dynamoDBClient, String tableName, String taskid, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { IRecordProcessorFactory recordProcessorFactory = new KclRecordProcessorFactory(tableName, eventsQueue, recordProcessorsRegister); KinesisClientLibConfiguration clientLibConfiguration = getClientLibConfiguration(tableName, taskid, - dynamoDBClient, endpoint, kclTableBillingMode); + dynamoDBClient, endpoint, isSkipSync, kclTableBillingMode); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); @@ -123,8 +123,15 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, String taskid, AmazonDynamoDB dynamoDBClient, String endpoint, + Boolean isSkipSync, BillingMode kclTableBillingMode) { + InitialPositionInStream initialPosition; + if (isSkipSync) { + initialPosition = InitialPositionInStream.LATEST; + } else { + initialPosition = InitialPositionInStream.TRIM_HORIZON; + } String streamArn = dynamoDBClient.describeTable( new DescribeTableRequest() .withTableName(tableName)).getTable().getLatestStreamArn(); @@ -141,7 +148,7 @@ KinesisClientLibConfiguration getClientLibConfiguration(String tableName, // worker will use checkpoint tableName if available, otherwise it is safer // to start at beginning of the stream - .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) + .withInitialPositionInStream(initialPosition) // we want the maximum batch size to avoid network transfer latency overhead .withMaxRecords(Constants.STREAMS_RECORDS_LIMIT) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 54623cb..e179d9d 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -15,6 +15,8 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration; @@ -29,7 +31,8 @@ @SuppressWarnings("ConstantConditions") public class DynamoDBSourceTaskTests { private final static String tableName = "testTable1"; - + // TODO: remove logger + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBSourceTaskTests.class); private HashMap configs; @BeforeEach @@ -208,6 +211,7 @@ public void kclWorkerIsStartedOnStart() throws InterruptedException { eq(tableName), eq("testTask1"), eq(null), + eq(false), eq(BillingMode.PROVISIONED) ); } @@ -243,6 +247,7 @@ public void ifTaskIsStoppedPollDoesNothing() throws InterruptedException { @Test public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedException { + LOGGER.debug("start onInitSyncRunningPollReturnsScannedItemsBatch"); // Arrange HashMap offset = new HashMap<>(); offset.put("table_name", tableName); @@ -875,4 +880,104 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws InterruptedException { + // Arrange + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000001", + "INSERT")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key2")), + null, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000002", + "REMOVE")); + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:00:00.00Z"), + "1000000003", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(4, response.size()); + assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document")); + assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document")); + assertNull(response.get(2).value()); // tombstone + } + @Test + public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { + // Arrange + configs.put("init.sync.delay.period", "2"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + List> initSyncRecords = Collections.singletonList(row); + + dynamoDBRecords.getRecords().add( + getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withS("key1")), + row, Instant.parse("2001-01-01T01:01:00.00Z"), + "1000000001", + "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-01T01:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, null) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + Instant start = Instant.now(); + List response = task.poll(); + Instant stop = Instant.now(); + + // Assert + assertTrue(Duration.between(start, stop).getSeconds() == 0); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(1, response.size()); + } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java index 12b7bdd..e2336b2 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/kcl/KclWorkerImplTests.java @@ -35,6 +35,7 @@ void initializationRegistersNewShardToRegistry() { String tableName = "testTableName1"; String taskId = "task1"; String serviceEndpoint = "http://localhost:8000"; + Boolean isSyncSkip = false; BillingMode kclTableBillingMode = BillingMode.PROVISIONED; AmazonDynamoDB dynamoDBClient = Mockito.mock(AmazonDynamoDB.class); @@ -43,7 +44,7 @@ void initializationRegistersNewShardToRegistry() { when(dynamoDBClient.describeTable(ArgumentMatchers.any())).thenReturn(result); // Act - KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, kclTableBillingMode); + KinesisClientLibConfiguration clientLibConfiguration = kclWorker.getClientLibConfiguration(tableName, taskId, dynamoDBClient, serviceEndpoint, isSyncSkip, kclTableBillingMode); // Assert assertEquals("datalake-KCL-testTableName1", clientLibConfiguration.getApplicationName()); From 7665279292a799b63dc30ad22592f97a43d4ae9f Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Wed, 1 Mar 2023 19:03:25 -0600 Subject: [PATCH 3/6] sourceInfo skipInitSync method --- .../dynamodb/DynamoDBSourceTask.java | 7 +++-- .../connector/dynamodb/SourceInfo.java | 8 ++++++ .../dynamodb/DynamoDBSourceTaskTests.java | 27 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 98729cb..7e9a05a 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -89,6 +89,7 @@ public class DynamoDBSourceTask extends SourceTask { private SourceInfo sourceInfo; private TableDescription tableDesc; private int initSyncDelay; + private boolean initSyncSkip; @SuppressWarnings("unused") //Used by Confluent platform to initialize connector @@ -124,6 +125,7 @@ public void start(Map configProperties) { tableDesc = client.describeTable(config.getTableName()).getTable(); initSyncDelay = config.getInitSyncDelay(); + initSyncSkip = config.getInitSyncSkip(); LOGGER.debug("Getting offset for table: {}", tableDesc.getTableName()); setStateFromOffset(); @@ -169,6 +171,9 @@ private void setStateFromOffset() { .offset(Collections.singletonMap("table_name", tableDesc.getTableName())); if (offset != null) { sourceInfo = SourceInfo.fromOffset(offset, clock); + if (initSyncSkip) { + sourceInfo.skipInitSync(); + } } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); @@ -223,8 +228,6 @@ public List poll() throws InterruptedException { * {@link SourceInfo}. */ private LinkedList initSync() throws Exception { - // TODO: remove log - LOGGER.info("init sync running"); if (sourceInfo.lastInitSyncStart.compareTo(Instant.now(clock).minus(Duration.ofHours(19))) <= 0) { LOGGER.error("Current INIT_SYNC took over 19 hours. Restarting INIT_SYNC! {}", sourceInfo); sourceInfo.startInitSync(); diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java index c0f04d4..51c3162 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/SourceInfo.java @@ -71,6 +71,14 @@ public void endInitSync() { lastInitSyncEnd = Instant.now(clock); } + public void skipInitSync() { + initSyncStatus = InitSyncStatus.SKIPPED; + lastInitSyncStart = Instant.ofEpochSecond(0); + lastInitSyncEnd = Instant.ofEpochSecond(0);; + exclusiveStartKey = null; + initSyncCount = 0L; + } + private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct() .name(SchemaNameAdjuster .defaultAdjuster() diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index e179d9d..8b24372 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -880,9 +880,35 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo()); } + @Test + public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws InterruptedException { + configs.put("init.sync.skip", "true"); + // Arrange + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-02T00:00:00.00Z").toEpochMilli()); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .buildTask(); + + // Act + task.start(configs); + + // Assert + SourceInfo sourceInfo = task.getSourceInfo(); + assertEquals(tableName, sourceInfo.tableName); + assertEquals(InitSyncStatus.SKIPPED, sourceInfo.initSyncStatus); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncStart); + assertEquals(Instant.parse("1970-01-01T00:00:00Z"), sourceInfo.lastInitSyncEnd); + } + @Test public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws InterruptedException { // Arrange + configs.put("init.sync.skip", "true"); + HashMap offset = new HashMap<>(); offset.put("table_name", tableName); offset.put("init_sync_state", "SKIPPED"); @@ -938,6 +964,7 @@ public void skippedInitSyncOnSyncPollReturnsReceivedRecords() throws Interrupted @Test public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { // Arrange + configs.put("init.sync.skip", "true"); configs.put("init.sync.delay.period", "2"); HashMap offset = new HashMap<>(); From b1f839c0ca076af23fc68b48bb3b6464427594a7 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Thu, 2 Mar 2023 13:10:40 -0600 Subject: [PATCH 4/6] init sync flag to not restart if skipped --- .../dynamodb/DynamoDBSourceTaskTests.java | 50 +++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 8b24372..41fcdee 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -31,8 +31,6 @@ @SuppressWarnings("ConstantConditions") public class DynamoDBSourceTaskTests { private final static String tableName = "testTable1"; - // TODO: remove logger - private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBSourceTaskTests.class); private HashMap configs; @BeforeEach @@ -247,7 +245,6 @@ public void ifTaskIsStoppedPollDoesNothing() throws InterruptedException { @Test public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedException { - LOGGER.debug("start onInitSyncRunningPollReturnsScannedItemsBatch"); // Arrange HashMap offset = new HashMap<>(); offset.put("table_name", tableName); @@ -1007,4 +1004,51 @@ public void onStartInitSyncSkipIsNotDelayed() throws InterruptedException { assertEquals(0, task.getSourceInfo().initSyncCount); assertEquals(1, response.size()); } + + @Test + public void onSyncPollInitSyncSkipReturnsNullAndDoesNotStartInitSyncIfAnyOneRecordEventArrivedTooLate() throws InterruptedException { + // Arrange + configs.put("init.sync.skip", "true"); + + HashMap offset = new HashMap<>(); + offset.put("table_name", tableName); + offset.put("init_sync_state", "SKIPPED"); + offset.put("init_sync_start", Instant.parse("2001-01-01T00:00:00.00Z").toEpochMilli()); + + KclRecordsWrapper dynamoDBRecords = new KclRecordsWrapper("testShardId1", new LinkedList<>()); + + TableDescription tableDescription = new TableDescription(); + tableDescription.setTableName(tableName); + tableDescription.setKeySchema(Collections.singleton(new KeySchemaElement("col1", "S"))); + + Map row = new HashMap<>(); + row.put("col1", new AttributeValue("key1")); + row.put("col2", new AttributeValue("val1")); + row.put("col3", new AttributeValue().withN("1")); + List> initSyncRecords = Collections.singletonList(row); + + Map exclusiveStartKey = Collections.singletonMap("fake", new AttributeValue("key")); + + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T15:00:00.00Z"), "s1", "INSERT")); + dynamoDBRecords.getRecords().add(getRecordAdapter(Collections.singletonMap("col1", new AttributeValue().withN("key1")), + row, Instant.parse("2001-01-03T00:00:00.00Z"), "s2", "INSERT")); + + DynamoDBSourceTask task = new SourceTaskBuilder() + .withOffset(offset) + .withClock(Clock.fixed(Instant.parse("2001-01-03T20:00:00.00Z"), ZoneId.of("UTC"))) + .withTableDescription(tableDescription) + .withInitSyncRecords(initSyncRecords, exclusiveStartKey) + .withSyncRecords(Collections.singletonList(dynamoDBRecords)) + .buildTask(); + + // Act + task.start(configs); + List response = task.poll(); + + // Assert + assertEquals(2, response.size()); + assertEquals(0, task.getSourceInfo().initSyncCount); + assertEquals(InitSyncStatus.SKIPPED, task.getSourceInfo().initSyncStatus); + } } From 1bf58c45d961cd623cf64786e32546db94060e96 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Fri, 3 Mar 2023 13:49:52 -0600 Subject: [PATCH 5/6] changed offset time of init sync sourceInfo --- .../trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 41fcdee..3e067f5 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -884,7 +884,7 @@ public void sourceInfoOfSkippedInitSyncIsLoadedFromOffsetOnStart() throws Interr HashMap offset = new HashMap<>(); offset.put("table_name", tableName); offset.put("init_sync_state", "SKIPPED"); - offset.put("init_sync_start", Instant.parse("2001-01-02T00:00:00.00Z").toEpochMilli()); + offset.put("init_sync_start", Instant.parse("1970-01-01T00:00:00Z").toEpochMilli()); DynamoDBSourceTask task = new SourceTaskBuilder() .withOffset(offset) From 3e63b7364293554faa1cad7ad0e0bf058f6eeee8 Mon Sep 17 00:00:00 2001 From: Ryan Clark Date: Tue, 7 Mar 2023 18:55:37 -0600 Subject: [PATCH 6/6] remove comment --- .../com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java | 2 +- .../trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java index 7e9a05a..10a0c0e 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTask.java @@ -177,7 +177,7 @@ private void setStateFromOffset() { } else { LOGGER.debug("No stored offset found for table: {}", tableDesc.getTableName()); sourceInfo = new SourceInfo(tableDesc.getTableName(), clock); - sourceInfo.startInitSync(); // InitSyncStatus always needs to run after adding new table + sourceInfo.startInitSync(); } } diff --git a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java index 3e067f5..ea01058 100644 --- a/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java +++ b/source/src/test/java/com/trustpilot/connector/dynamodb/DynamoDBSourceTaskTests.java @@ -15,8 +15,6 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.time.Clock; import java.time.Duration;