From 3e6f1f1088c6fda66a345bd0f0458676466bafba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raimondas=20Tij=C5=ABnaitis?= Date: Thu, 22 Feb 2018 15:09:21 +0200 Subject: [PATCH] SMT to fallback message keys to legacy debezium mongodb format --- .../FallbackToLegacyMessageKeyFormat.java | 97 +++++++++++++++++++ .../FallbackToLegacyMessageKeyFormatTest.java | 94 ++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormat.java create mode 100644 debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormatTest.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormat.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormat.java new file mode 100644 index 00000000000..845a6324aa8 --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormat.java @@ -0,0 +1,97 @@ +package io.debezium.connector.mongodb.transforms; + +import com.mongodb.util.JSON; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.bson.types.ObjectId; + +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +/** + * SMT to fallback to legacy debezium MongoDB key format that was deprecated (http://debezium.io/docs/releases/#release-0-6-0). + * Use it to prevent consumers to break after debezium mongodb connector upgrade + * + * Usage example: + * + * ... + * "transforms": "fallback_keys", + * "transforms.fallback_keys.type": "io.debezium.connector.mongodb.transforms.FallbackToLegacyMessageKeyFormat", + + * + * @param the subtype of {@link ConnectRecord} on which this transformation will operate + * @author Raimondas Tijunaitis + */ + +public class FallbackToLegacyMessageKeyFormat> implements Transformation { + + private static final String PURPOSE = "field replacement"; + private Cache schemaUpdateCache; + + @Override + public R apply(R record) { + final Struct value = requireStruct(record.key(), PURPOSE); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); + } + + Field field = updatedSchema.field("_id"); + final Struct updatedValue = new Struct(updatedSchema); + final String fieldValue = value.getString("id"); + + updatedValue.put(field.name(), objectIdLiteral(JSON.parse(fieldValue))); + + return newRecord(record, updatedSchema, updatedValue); + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + schema.field("id"); + builder.field("_id", schema.field("id").schema()); + return builder.build(); + } + + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + + protected String objectIdLiteral(Object id) { + if (id == null) { + return null; + } + if (id instanceof ObjectId) { + return ((ObjectId) id).toHexString(); + } + if (id instanceof String) { + return (String) id; + } + return id.toString(); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + } + + @Override + public void configure(Map map) { + schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); + } +} diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormatTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormatTest.java new file mode 100644 index 00000000000..92077393be3 --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FallbackToLegacyMessageKeyFormatTest.java @@ -0,0 +1,94 @@ +package io.debezium.connector.mongodb.transforms; + +import io.debezium.connector.mongodb.CollectionId; +import io.debezium.connector.mongodb.RecordMakers; +import io.debezium.connector.mongodb.SourceInfo; +import io.debezium.connector.mongodb.TopicSelector; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.*; + +import static org.fest.assertions.Assertions.assertThat; + +/** + * Unit test for {@link FallbackToLegacyMessageKeyFormat}. It uses {@link RecordMakers} + * to assemble source records as the connector would emit them and feeds them to + * the SMT. + * + * @author Raimondas Tijunaitis + */ + +public class FallbackToLegacyMessageKeyFormatTest { + private static final String SERVER_NAME = "serverX."; + private static final String PREFIX = SERVER_NAME + "."; + + private SourceInfo source; + private RecordMakers recordMakers; + private TopicSelector topicSelector; + private List produced; + + private FallbackToLegacyMessageKeyFormat transformation; + + @Before + public void setup() { + source = new SourceInfo(SERVER_NAME); + topicSelector = TopicSelector.defaultSelector(PREFIX); + produced = new ArrayList<>(); + recordMakers = new RecordMakers(source, topicSelector, produced::add); + transformation = new FallbackToLegacyMessageKeyFormat(); + } + + @After + public void closeSmt() { + transformation.close(); + } + + @Test + public void shouldMapKeyToLegacyDebeziumStructureTest() throws InterruptedException { + + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + SourceRecord record = createSourceRecord(after); + + assertThat(((Struct)record.key()).get("id")).isNotNull(); + assertThat(((Struct)record.key()).getString("id")).isNotEqualTo(id.toString()); + + // when + Map cfg = new HashMap(); + + transformation.configure(cfg); + + SourceRecord transformed = transformation.apply(record); + assertThat(((Struct)transformed.key()).get("_id")).isNotNull(); + assertThat(((Struct)transformed.key()).getString("_id")).isEqualTo(id.toString()); + transformation.close(); + } + + + private SourceRecord createSourceRecord(Document after) throws InterruptedException { + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + BsonTimestamp ts = new BsonTimestamp(1000, 1); + + Document event = new Document().append("o", after) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "i"); + RecordMakers.RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + return produced.get(produced.size()-1); + } + +}