From c1c4eb4a324f76376d59c3d558e0e64a28d32940 Mon Sep 17 00:00:00 2001 From: Sam Corzine Date: Fri, 16 Jun 2023 11:57:04 -0500 Subject: [PATCH 1/2] make field optional --- .../trustpilot/connector/dynamodb/utils/RecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index d5285ff..673ac04 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -52,7 +52,7 @@ public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) - .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.schema()) + .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.builder().optional().build()) .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) From 8b4f28e7ae3874a05b815f2b021390466983bacd Mon Sep 17 00:00:00 2001 From: Sam Corzine Date: Fri, 16 Jun 2023 12:30:58 -0500 Subject: [PATCH 2/2] handle null better --- .../connector/dynamodb/utils/RecordConverter.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index 673ac04..5bc17d3 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -109,11 +109,16 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) - .put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); + if (unMarshalledOldItems == null) { + valueData.put(Envelope.FieldName.OLD_DOCUMENT, null); + } else { + valueData.put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)); + } + return new SourceRecord( Collections.singletonMap("table_name", sourceInfo.tableName), offsets,