Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Un marshall dynamo documents #9

Merged
merged 3 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
Expand All @@ -41,14 +40,22 @@ public class RecordConverter {
private final TableDescription tableDesc;
private final String topic_name;
private Schema keySchema;
private Schema valueSchema;
private final Schema valueSchema;

private List<String> keys;

public RecordConverter(TableDescription tableDesc, String topicNamePrefix) {
this.tableDesc = tableDesc;
this.topic_name = topicNamePrefix;
this.topic_name = topicNamePrefix;

valueSchema = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
.field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema())
.field(Envelope.FieldName.SOURCE, SourceInfo.structSchema())
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)
.build();
}

public SourceRecord toSourceRecord(
Expand All @@ -69,20 +76,7 @@ public SourceRecord toSourceRecord(
));

// getUnmarshallItems from Dynamo Document
//Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);

//JSON conversion
//String outputJsonString = ItemUtils.toItem(attributes).toJSON();
Struct dynamoAttributes = getAttributeValueStruct(sanitisedAttributes);

valueSchema = SchemaBuilder.struct()
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
.field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.DOCUMENT, getAttributeValueSchema(sanitisedAttributes))
.field(Envelope.FieldName.SOURCE, SourceInfo.structSchema())
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)
.build();
Map<String, Object> unMarshalledItems = ItemUtils.toSimpleMapValue(attributes);

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Expand Down Expand Up @@ -111,7 +105,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, dynamoAttributes) // objectMapper.writeValueAsString(outputJsonString))
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand Down Expand Up @@ -148,65 +142,4 @@ private String sanitiseAttributeName(final String attributeName) {

return sanitisedAttributeName;
}

public static Struct getAttributeValueStruct(Map<String, AttributeValue> attributes) {
final Struct attributeValueStruct = new Struct(getAttributeValueSchema(attributes));

// Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link)
//https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java

for (Map.Entry<String, AttributeValue> attribute : attributes.entrySet()) {
final String attributeName = attribute.getKey();
final AttributeValue attributeValue = attribute.getValue();
if (attributeValue.getS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getS());
} else if (attributeValue.getN() != null) {
attributeValueStruct.put(attributeName, attributeValue.getN());
} else if (attributeValue.getB() != null) {
attributeValueStruct.put(attributeName, attributeValue.getB());
} else if (attributeValue.getSS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getSS());
} else if (attributeValue.getNS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getNS());
} else if (attributeValue.getBS() != null) {
attributeValueStruct.put(attributeName, attributeValue.getBS());
} else if (attributeValue.getNULL() != null) {
attributeValueStruct.put(attributeName, attributeValue.getNULL());
} else if (attributeValue.getBOOL() != null) {
attributeValueStruct.put(attributeName, attributeValue.getBOOL());
}
}
return attributeValueStruct;
}

public static Schema getAttributeValueSchema(Map<String, AttributeValue> attributes) {
SchemaBuilder RECORD_ATTRIBUTES_SCHEMA = SchemaBuilder.struct().name("DynamoDB.AttributeValue");

// Mapping dynamo db attributes to schema registry types (dynamo db attributes are documented at below link)
//https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-dynamodb/src/main/java/com/amazonaws/services/dynamodbv2/model/AttributeValue.java

for (Map.Entry<String, AttributeValue> attribute : attributes.entrySet()) {
final String attributeName = attribute.getKey();
final AttributeValue attributeValue = attribute.getValue();
if (attributeValue.getS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA);
} else if (attributeValue.getN() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.STRING_SCHEMA);
} else if (attributeValue.getB() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BYTES_SCHEMA);
} else if (attributeValue.getSS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA));
} else if (attributeValue.getNS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.STRING_SCHEMA));
} else if (attributeValue.getBS() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, SchemaBuilder.array(Schema.BYTES_SCHEMA));
} else if (attributeValue.getNULL() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA);
} else if (attributeValue.getBOOL() != null) {
RECORD_ATTRIBUTES_SCHEMA.field(attributeName, Schema.BOOLEAN_SCHEMA);
}
}
return RECORD_ATTRIBUTES_SCHEMA.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -280,22 +276,9 @@ public void onInitSyncRunningPollReturnsScannedItemsBatch() throws InterruptedEx
assertEquals(Instant.parse("2001-01-01T00:00:00.00Z"), task.getSourceInfo().lastInitSyncStart);
assertEquals(1, task.getSourceInfo().initSyncCount);

final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col2", Schema.STRING_SCHEMA)
.field("col3", Schema.STRING_SCHEMA)
.field("col1", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("col2","val1")
.put("col3","1")
.put("col1","key1");

Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ;

assertEquals(1, response.size());
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
compareStructs(expectedDocument, actualDocument);
assertEquals("r", ((Struct) response.get(0).value()).getString("op"));
assertEquals(("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}"), ((Struct) response.get(0).value()).getString("document"));
assertEquals(InitSyncStatus.RUNNING, task.getSourceInfo().initSyncStatus);
assertEquals(exclusiveStartKey, task.getSourceInfo().exclusiveStartKey);
}
Expand Down Expand Up @@ -575,30 +558,10 @@ public void onSyncPollReturnsReceivedRecords() throws InterruptedException {
task.start(configs);
List<SourceRecord> response = task.poll();

final Schema expectedDocumentSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col2", Schema.STRING_SCHEMA)
.field("col3", Schema.STRING_SCHEMA)
.field("col1", Schema.STRING_SCHEMA)
.build();
final Struct expectedDocument = new Struct(expectedDocumentSchema)
.put("col2","val1")
.put("col3","1")
.put("col1","key1");

final Schema expectedDocumentColSchema = SchemaBuilder.struct().name("DynamoDB.AttributeValue")
.field("col1", Schema.STRING_SCHEMA)
.build();

final Struct expectedDocColValue = new Struct(expectedDocumentColSchema)
.put("col1","key2");

Struct actualDocument = ((Struct) response.get(0).value()).getStruct("document") ;
Struct actualDocumentCol = ((Struct) response.get(1).value()).getStruct("document");

// Assert
assertEquals(3, response.size());
compareStructs(expectedDocument, actualDocument);
compareStructs(expectedDocColValue, actualDocumentCol);
assertEquals(3, response.size());
assertEquals("{\"col2\":\"val1\",\"col3\":1,\"col1\":\"key1\"}", ((Struct) response.get(0).value()).getString("document"));
assertEquals("{\"col1\":\"key2\"}", ((Struct) response.get(1).value()).getString("document"));
assertNull(response.get(2).value()); // tombstone
}

Expand Down Expand Up @@ -912,18 +875,4 @@ public void onCommitIgnoreRecordsWithoutSequenceNumber() throws InterruptedExcep
assertEquals("", shardRegister.get("shard1").getLastCommittedRecordSeqNo());
}

public void compareStructs(Struct expectedStruct , Struct actualStruct) {

// comparing schema for both struct
if (!Objects.equals(expectedStruct.schema(), actualStruct.schema())) {
fail("Schema expected " + expectedStruct.schema().fields() + " but actual " + actualStruct.schema().fields());
}

// comparing all fields for both struct
for (Field expectedFieldName : expectedStruct.schema().fields()) {
Field actualFieldName = actualStruct.schema().field(expectedFieldName.name());
assertEquals(expectedStruct.get(expectedFieldName), actualStruct.get(actualFieldName));
}

}
}
}
Loading