Skip to content

Commit

Permalink
Merge pull request #14 from fetch-rewards/hotfix/fix-schema
Browse files Browse the repository at this point in the history
Optional Field
  • Loading branch information
samcorzineatfetch authored Jun 16, 2023
2 parents d6f86b6 + 8b4f28e commit f920119
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public RecordConverter(TableDescription tableDesc, String topicNamePrefix) {
.name(SchemaNameAdjuster.DEFAULT.adjust( "com.trustpilot.connector.dynamodb.envelope"))
.field(Envelope.FieldName.VERSION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.DOCUMENT, DynamoDbJson.schema())
.field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.schema())
.field(Envelope.FieldName.OLD_DOCUMENT, DynamoDbJson.builder().optional().build())
.field(Envelope.FieldName.SOURCE, SourceInfo.structSchema())
.field(Envelope.FieldName.OPERATION, Schema.STRING_SCHEMA)
.field(Envelope.FieldName.TIMESTAMP, Schema.INT64_SCHEMA)
Expand Down Expand Up @@ -109,11 +109,16 @@ public SourceRecord toSourceRecord(
Struct valueData = new Struct(valueSchema)
.put(Envelope.FieldName.VERSION, sourceInfo.version)
.put(Envelope.FieldName.DOCUMENT, objectMapper.writeValueAsString(unMarshalledItems))
.put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems))
.put(Envelope.FieldName.SOURCE, SourceInfo.toStruct(sourceInfo))
.put(Envelope.FieldName.OPERATION, op.code())
.put(Envelope.FieldName.TIMESTAMP, arrivalTimestamp.toEpochMilli());

if (unMarshalledOldItems == null) {
valueData.put(Envelope.FieldName.OLD_DOCUMENT, null);
} else {
valueData.put(Envelope.FieldName.OLD_DOCUMENT, objectMapper.writeValueAsString(unMarshalledOldItems));
}

return new SourceRecord(
Collections.singletonMap("table_name", sourceInfo.tableName),
offsets,
Expand Down

0 comments on commit f920119

Please sign in to comment.