Skip to content

Commit

Permalink
Sanitises invalid Avro field names (#9)
Browse files Browse the repository at this point in the history
Avro is [quite restrictive in what characters are allowed when defining a field](https://avro.apache.org/docs/current/spec.html#names) but DynamoDB [would happily allow characters](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html) like `- (dash)` or `. (dot)`.

We updated the connectors code so that field names are sanitised so that they match Avro's field naming convention by replacing any invalid characters with `_ (underscore)`.

https://trello.com/c/Dhv4FIod/1198-dynamodb-kafka-connector-sanitise-invalid-field-names

Co-authored-by: Emilio Larrambebere <[email protected]>
  • Loading branch information
TPRobots and emilio-larrambebere-TP authored May 12, 2021
1 parent c35e7a5 commit a1e993b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

import java.time.Instant;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -62,6 +64,15 @@ public SourceRecord toSourceRecord(
String shardId,
String sequenceNumber) throws Exception {

// Sanitise the incoming attributes to remove any invalid Avro characters
final Map<String, AttributeValue> sanitisedAttributes = attributes.entrySet().stream()
.collect(Collectors.toMap(
e -> this.sanitiseAttributeName(e.getKey()),
Map.Entry::getValue,
(u, v) -> u,
LinkedHashMap::new
));

// Leveraging offsets to store shard and sequence number with each item pushed to Kafka.
// This info will only be used to update `shardRegister` and won't be used to reset state after restart
Map<String, Object> offsets = SourceInfo.toOffset(sourceInfo);
Expand All @@ -70,13 +81,13 @@ public SourceRecord toSourceRecord(

// DynamoDB keys can be changed only by recreating the table
if (keySchema == null) {
keys = tableDesc.getKeySchema().stream().map(KeySchemaElement::getAttributeName).collect(toList());
keys = tableDesc.getKeySchema().stream().map(this::sanitiseAttributeName).collect(toList());
keySchema = getKeySchema(keys);
}

Struct keyData = new Struct(getKeySchema(keys));
for (String key : keys) {
AttributeValue attributeValue = attributes.get(key);
AttributeValue attributeValue = sanitisedAttributes.get(key);
if (attributeValue.getS() != null) {
keyData.put(key, attributeValue.getS());
continue;
Expand All @@ -89,7 +100,7 @@ public SourceRecord toSourceRecord(

Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(attributes))
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(sanitisedAttributes))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());
Expand All @@ -113,4 +124,11 @@ private Schema getKeySchema(List<String> keys) {
return keySchemaBuilder.build();
}

private String sanitiseAttributeName(KeySchemaElement element) {
return this.sanitiseAttributeName(element.getAttributeName());
}

private String sanitiseAttributeName(final String attributeName) {
return attributeName.replaceAll("^[^a-zA-Z_]|(?<!^)[^a-zA-Z0-9_]", "_");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ private Map<String, AttributeValue> getAttributes() {
return attributes;
}

private Map<String, AttributeValue> getAttributesWithInvalidAvroCharacters() {
Map<String, AttributeValue> attributes = new HashMap<>();
attributes.put("test-1234", new AttributeValue().withS("testKV1Value"));
attributes.put("1-starts-with-number", new AttributeValue().withS("2"));
attributes.put("_starts_with_underscore", new AttributeValue().withN("1"));
attributes.put("test!@£$%^", new AttributeValue().withS("testStringValue"));

return attributes;
}



private SourceInfo getSourceInfo(String table) {
SourceInfo sourceInfo = new SourceInfo(table, Clock.fixed(Instant.parse("2001-01-02T00:00:00Z"), ZoneId.of("UTC")));
sourceInfo.initSyncStatus = InitSyncStatus.RUNNING;
Expand Down Expand Up @@ -191,6 +203,81 @@ public void recordAttributesAreAddedToValueData() throws Exception {
((Struct) record.value()).getString("document"));
}

@Test
public void singleItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception {
// Arrange
List<KeySchemaElement> keySchema = new LinkedList<>();
keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234"));

RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

// Assert
assertEquals("test_1234", record.keySchema().fields().get(0).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema());
assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234"));
}

@Test
public void multiItemKeyIsAddedToRecordWhenKeyContainsInvalidCharacters() throws Exception {
// Arrange
List<KeySchemaElement> keySchema = new LinkedList<>();
keySchema.add(new KeySchemaElement().withKeyType("S").withAttributeName("test-1234"));
keySchema.add(new KeySchemaElement().withKeyType("N").withAttributeName("1-starts-with-number"));

RecordConverter converter = new RecordConverter(getTableDescription(keySchema), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

// Assert
assertEquals("test_1234", record.keySchema().fields().get(0).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(0).schema());
assertEquals("testKV1Value", ((Struct) record.key()).getString("test_1234"));

assertEquals("__starts_with_number", record.keySchema().fields().get(1).name());
assertEquals(SchemaBuilder.string().build(), record.keySchema().fields().get(1).schema());
assertEquals("2", ((Struct) record.key()).getString("__starts_with_number"));
}

@Test
public void recordAttributesAreAddedToValueDataWhenAttributesContainsInvalidCharacters() throws Exception {
// Arrange
RecordConverter converter = new RecordConverter(getTableDescription(null), "TestTopicPrefix-");

// Act
SourceRecord record = converter.toSourceRecord(
getSourceInfo(table),
Envelope.Operation.forCode("r"),
getAttributesWithInvalidAvroCharacters(),
Instant.parse("2001-01-02T00:00:00.00Z"),
"testShardID1",
"testSequenceNumberID1"
);

String expected = "{\"test_1234\":{\"s\":\"testKV1Value\"},\"_starts_with_underscore\":{\"n\":\"1\"},\"__starts_with_number\":{\"s\":\"2\"},\"test______\":{\"s\":\"testStringValue\"}}";

// Assert
assertEquals(expected,
((Struct) record.value()).getString("document"));
}

@Test
public void sourceInfoIsAddedToValueData() throws Exception {
// Arrange
Expand Down

0 comments on commit a1e993b

Please sign in to comment.