diff --git a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java index d5285ff..5bc17d3 100644 --- a/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java +++ b/source/src/main/java/com/trustpilot/connector/dynamodb/utils/RecordConverter.java @@ -52,7 +52,7 @@ public RecordConverter(TableDescription tableDesc, String topicNamePrefix) { .name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope")) .field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema()) - .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.schema()) + .field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.builder().optional().build()) .field(Envelope.FieldName.SOURCE, SourceInfo.structSchema()) .field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA) .field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA) @@ -109,11 +109,16 @@ public SourceRecord toSourceRecord( Struct valueData = new Struct(valueSchema) .put(Envelope.FieldName.VERSION, sourceInfo.version) .put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems)) - .put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)) .put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo)) .put(Envelope.FieldName.OPERATION, op.code()) .put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli()); + if (unMarshalledOldItems == null) { + valueData.put(Envelope.FieldName.OLD_DOCUMENT, null); + } else { + valueData.put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems)); + } + return new SourceRecord( Collections.singletonMap("table_name", sourceInfo.tableName), offsets,