Skip to content

Commit

Permalink
Merge pull request #5 from fetch-rewards/convert-dyanmo-documents-to-…
Browse files Browse the repository at this point in the history
…JSON

updating format to JSON before pushing records to kafka
  • Loading branch information
gurjit-sandhu authored Sep 13, 2022
2 parents 635aa53 + f6ed6e0 commit 679b092
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;

import static java.util.stream.Collectors.toList;

/**
Expand Down Expand Up @@ -76,7 +82,19 @@ public SourceRecord toSourceRecord(
));

// getUnmarshallItems from Dynamo Document
Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);
//Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);

//JSON conversion
String outputJsonString = null;
try {
String jsonString = ItemUtils.toItem(attributes).toJSON();
JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject();
Gson gson = new GsonBuilder().setPrettyPrinting().create();
outputJsonString = gson.toJson(jsonObject);
} catch (JsonParseException e) {
e.printStackTrace();
throw new Exception("Error Occured in JSON Parsing " + e.getMessage(), e);
}

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Expand Down Expand Up @@ -105,7 +123,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems))
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(outputJsonString))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -272,13 +275,21 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx
task.start(configs);
List<SourceRecord> response = task.poll();

String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\"";
String actual = (((Struct) response.get(0).value()).getString("document"));

// Converting both expected and actual to JSON string
Gson gson = new GsonBuilder().setPrettyPrinting().create();
expected = gson.toJson(expected);
actual = gson.toJson(actual);

// Assert
assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart);
assertEquals(1, task.getSourceInfo().initSyncCount);

assertEquals(1, response.size());
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
assertEquals(({"col2":"val1","col3":1,"col1":"key1"}), ((Struct) response.get(0).value()).getString("document"));
assertEquals(expected , actual);
assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus);
assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey);
}
Expand Down Expand Up @@ -557,11 +568,23 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException {
// Act
task.start(configs);
List<SourceRecord> response = task.poll();

String expected = "\"{\\n \\\"col2\\\": \\\"val1\\\",\\n \\\"col3\\\": 1,\\n \\\"col1\\\": \\\"key1\\\"\\n}\"";
String expected_document_key = "\"{\\n \\\"col1\\\": \\\"key2\\\"\\n}\"";
String actual = (((Struct) response.get(0).value()).getString("document"));
String actual_document_key = ((Struct) response.get(1).value()).getString("document");

// Converting both expected and actual to JSON string
Gson gson = new GsonBuilder().setPrettyPrinting().create();
expected = gson.toJson(expected);
expected_document_key = gson.toJson(expected_document_key);
actual = gson.toJson(actual);
actual_document_key = gson.toJson(actual_document_key);

// Assert
assertEquals(3, response.size());
assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document"));
assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document"));
assertEquals(expected, actual);
assertEquals(expected_document_key, actual_document_key);
assertNull(response.get(2).value()); // tombstone
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@



package com.trustpilot.connector.dynamodb.utils;

import com.amazonaws.services.dynamodbv2.model.AttributeValue;
Expand All @@ -22,6 +19,9 @@
import java.util.List;
import java.util.Map;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import static org.junit.jupiter.api.Assertions.assertEquals;


Expand Down Expand Up @@ -198,9 +198,16 @@ public void recordAttributesAreAddedToValueData() throws Exception {
"testSequenceNumberID1"
);

String expected = "\"{\\n \\\"testKV1\\\": \\\"testKV1Value\\\",\\n \\\"testKV2\\\": \\\"2\\\",\\n \\\"testV2\\\": \\\"testStringValue\\\",\\n \\\"testV1\\\": 1\\n}\"";
String actual = ((Struct) record.value()).getString("document");

// Converting both expected and actual to JSON string
Gson gson = new GsonBuilder().setPrettyPrinting().create();
expected = gson.toJson(expected);
actual = gson.toJson(actual);

// Assert
assertEquals("{\"testKV1\":\"testKV1Value\",\"testKV2\":\"2\",\"testV2\":\"testStringValue\",\"testV1\":1}",
((Struct) record.value()).getString("document"));
assertEquals(expected, actual);
}

@Test
Expand Down Expand Up @@ -271,11 +278,16 @@ public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidChar
"testSequenceNumberID1"
);

String expected = "{\"test-1234\":\"testKV1Value\",\"_starts_with_underscore\":1,\"1-starts-with-number\":\"2\",\"test!@£$%^\":\"testStringValue\"}";
String expected = "\"{\\n \\\"test-1234\\\": \\\"testKV1Value\\\",\\n \\\"_starts_with_underscore\\\": 1,\\n \\\"1-starts-with-number\\\": \\\"2\\\",\\n \\\"test!@£$%^\\\": \\\"testStringValue\\\"\\n}\"";
String actual = ((Struct) record.value()).getString("document");

// Converting both expected and actual to JSON string
Gson gson = new GsonBuilder().setPrettyPrinting().create();
expected = gson.toJson(expected);
actual = gson.toJson(actual);

// Assert
assertEquals(expected,
((Struct) record.value()).getString("document"));
assertEquals(expected, actual);
}

@Test
Expand Down

0 comments on commit 679b092

Please sign in to comment.