Skip to content

Commit

Permalink
Merge pull request #4 from fetch-rewards/unMarshall-Dynamo-Documents
Browse files Browse the repository at this point in the history
Un marshall dynamo documents
  • Loading branch information
gurjit-sandhu authored Sep 12, 2022
2 parents cabd9f2 + 90e852b commit 635aa53
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.document.ItemUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -74,6 +75,9 @@ public SourceRecord toSourceRecord(
LinkedHashMap::new
));

// getUnmarshallItems from Dynamo Document
Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Map<String, Object> offsets = SourceInfo.toOffset(sourceInfo);
Expand Down Expand Up @@ -101,7 +105,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes))
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx

assertEquals(1, response.size());
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document"));
assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document"));
assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus);
assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey);
}
Expand Down Expand Up @@ -560,8 +560,8 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException {

// Assert
assertEquals(3, response.size());
assertEquals("{\"col2\":{\"s\":\"val1\"},\"col3\":{\"n\":\"1\"},\"col1\":{\"s\":\"key1\"}}", ((Struct) response.get(0).value()).getString("document"));
assertEquals("{\"col1\":{\"s\":\"key2\"}}", ((Struct) response.get(1).value()).getString("document"));
assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document"));
assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document"));
assertNull(response.get(2).value()); // tombstone
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void recordAttributesAreAddedToValueData() throws Exception {
);

// Assert
assertEquals("{\"testKV1\":{\"s\":\"testKV1Value\"},\"testKV2\":{\"s\":\"2\"},\"testV2\":{\"s\":\"testStringValue\"},\"testV1\":{\"n\":\"1\"}}",
assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}",
((Struct) record.value()).getString("document"));
}

Expand Down Expand Up @@ -271,7 +271,7 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar
"testSequenceNumberID1"
);

String expected = "{\"test1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"startswithnumber\":{\"s\":\"2\"},\"test\":{\"s\":\"testStringValue\"}}";
String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}";

// Assert
assertEquals(expected,
Expand Down

0 comments on commit 635aa53

Please sign in to comment.