From d543e9ce051388068a85135f0c4d85ec94414d0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raimondas=20Tij=C5=ABnaitis?= Date: Tue, 20 Feb 2018 13:22:08 +0200 Subject: [PATCH] filter fields SMT --- .../FilterFieldsFromMongoDbEnvelope.java | 120 +++++++++ .../FilterFieldsFromMongoDbEnvelopeTest.java | 232 ++++++++++++++++++ 2 files changed, 352 insertions(+) create mode 100644 debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelope.java create mode 100644 debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelopeTest.java diff --git a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelope.java b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelope.java new file mode 100644 index 00000000000..f10855cee0b --- /dev/null +++ b/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelope.java @@ -0,0 +1,120 @@ +package io.debezium.connector.mongodb.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonValue; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; + +import java.util.*; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; + + +/** + * SMT to filter MongoDB envelope content. + * + * Usage example: + * + * ... + * "transforms": "filterfields", + * "transforms.filterfields.type": "io.debezium.connector.mongodb.transforms.FilterFieldsFromMongoDbEnvelope", + * "transforms.filterfields.whitelist": "a,nested.p1,array,array[].p1", + * "transforms.filterfields.blacklist": "b" + * + * @param the subtype of {@link ConnectRecord} on which this transformation will operate + * @author Raimondas Tijunaitis + */ + +public class FilterFieldsFromMongoDbEnvelope> implements Transformation { + + private static final JsonWriterSettings WRITER_SETTINGS = new JsonWriterSettings(JsonMode.STRICT, "", ""); + + public static final String OVERVIEW_DOC = "Filter fields from MongoDB envelope in 'patch' and 'after' fields."; + + interface ConfigName { + String BLACKLIST = "blacklist"; + String WHITELIST = "whitelist"; + } + + private List blacklist; + private List whitelist; + + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to exclude. This takes precedence over the whitelist.") + .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to include. If specified, only these fields will be used."); + + @Override + public R apply(R r) { + Struct doc = (Struct) r.value(); + + if (doc.get("after") != null) { + filterFields(doc, "after"); + } + + if (doc.get("patch") != null) { + filterFields(doc, "patch"); + } + + return r; + } + + private void filterFields(Struct doc, String docName) { + BsonDocument val = BsonDocument.parse(doc.getString(docName)); + Set> set = val.entrySet(); + filterFields(set, ""); + + doc.put(docName, val.toJson(WRITER_SETTINGS)); + } + + private void filterFields(Set> set, String path) { + for (Iterator> iterator = set.iterator(); iterator.hasNext(); ) { + Map.Entry item = iterator.next(); + + if (item.getValue().getClass().equals(BsonDocument.class)) { + filterFields(((BsonDocument)item.getValue()).entrySet(), path + item.getKey() + "."); + } + + if (item.getValue().getClass().equals(BsonArray.class)) { + for (Iterator iterator2 = ((BsonArray) item.getValue()).iterator(); iterator2.hasNext(); ) { + BsonValue element = iterator2.next(); + if (element.getClass().equals(BsonDocument.class)) { + filterFields(((BsonDocument) element).entrySet(), path + item.getKey() + "[]."); + } + } + } + + if (!filter(path + item.getKey())) { + iterator.remove(); + } + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + @Override + public void configure(Map map) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map); + blacklist = config.getList(ConfigName.BLACKLIST); + whitelist = config.getList(ConfigName.WHITELIST); + } + + boolean filter(String fieldName) { + return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName)); + } +} diff --git a/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelopeTest.java b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelopeTest.java new file mode 100644 index 00000000000..4d4340b3e44 --- /dev/null +++ b/debezium-connector-mongodb/src/test/java/io/debezium/connector/mongodb/transforms/FilterFieldsFromMongoDbEnvelopeTest.java @@ -0,0 +1,232 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mongodb.transforms; + +import io.debezium.connector.mongodb.CollectionId; +import io.debezium.connector.mongodb.RecordMakers; +import io.debezium.connector.mongodb.RecordMakers.RecordsForCollection; +import io.debezium.connector.mongodb.SourceInfo; +import io.debezium.connector.mongodb.TopicSelector; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.print.Doc; +import java.util.*; + +import static org.fest.assertions.Assertions.assertThat; + +/** + * Unit test for {@link FilterFieldsFromMongoDbEnvelope}. It uses {@link RecordMakers} + * to assemble source records as the connector would emit them and feeds them to + * the SMT. + * + * @author Raimondas Tijunaitis + */ +public class FilterFieldsFromMongoDbEnvelopeTest { + + private static final String SERVER_NAME = "serverX."; + private static final String PREFIX = SERVER_NAME + "."; + + private SourceInfo source; + private RecordMakers recordMakers; + private TopicSelector topicSelector; + private List produced; + + private FilterFieldsFromMongoDbEnvelope transformation; + + @Before + public void setup() { + source = new SourceInfo(SERVER_NAME); + topicSelector = TopicSelector.defaultSelector(PREFIX); + produced = new ArrayList<>(); + recordMakers = new RecordMakers(source, topicSelector, produced::add); + transformation = new FilterFieldsFromMongoDbEnvelope(); + } + + @After + public void closeSmt() { + transformation.close(); + } + + @Test + public void shouldTransformRecordRemovingBlacklistedFieldForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1)); + + Map cfg = new HashMap(); + cfg.put("blacklist", "nest"); + + compareEvents(after, before, cfg, "after"); + } + + @Test + public void shouldTransformRecordRemovingBlacklistedNestedFieldForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p2", 2)); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1).append("p2", 2)); + + Map cfg = new HashMap(); + cfg.put("blacklist", "nest.p1"); + + compareEvents(after, before, cfg, "after"); + } + + @Test + public void shouldTransformRecordRemovingBlacklistedAllNestedFieldForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document()); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1).append("p2", 2)); + + Map cfg = new HashMap(); + cfg.put("blacklist", "nest.p1,nest.p2"); + + compareEvents(after, before, cfg, "after"); + } + + @Test + public void shouldTransformRecordRemovingBlacklistedMultiNestedFieldForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1).append("p2", new Document().append("pp2", 2))); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1).append("p2", new Document().append("pp1", 1).append("pp2", 2))); + + Map cfg = new HashMap(); + cfg.put("blacklist", "nest.p2.pp1"); + + compareEvents(after, before, cfg, "after"); + } + + @Test + public void shouldTransformRecordSelectingWhitelistedFieldOnlyForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally"); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("nest", new Document().append("p1", 1)); + + Map cfg = new HashMap(); + cfg.put("whitelist", "_id,name"); + + compareEvents(after, before, cfg, "after"); + } + + @Test + public void shouldTransformRecordSelectingWhitelistedFieldsFromArraysForInsertEvent() throws InterruptedException { + ObjectId id = new ObjectId(); + Document after = new Document().append("_id", id) + .append("name", "Sally") + .append("events", Arrays.asList( + new Document().append("e1", "1"), + new Document().append("e1", "12"))); + + Document before = new Document().append("_id", id) + .append("name", "Sally") + .append("phone", 123L) + .append("active", true) + .append("scores", Arrays.asList(1.2, 3.4, 5.6)) + .append("events", Arrays.asList( + new Document().append("e1", "1").append("e2", "2"), + new Document().append("e1", "12").append("e2", "22"))); + + Map cfg = new HashMap(); + cfg.put("whitelist", "_id,name,events,events[].e1"); + + compareEvents(after, before, cfg, "after"); + } + + private void compareEvents(Document doc1, Document doc2, Map cfg, String docName) throws InterruptedException { + + SourceRecord recordOrig = createSourceRecord(doc1); + SourceRecord record = createSourceRecord(doc2); + + // when + transformation.configure(cfg); + + SourceRecord transformed = transformation.apply(record); + String json1 = getDocumentJson(recordOrig, docName); + String json2 = getDocumentJson(transformed, docName); + assertThat(json1).isNotNull().isNotEmpty(); + assertThat(json2).isNotNull().isNotEmpty(); + assertThat(json2).isEqualTo(json1); + + transformation.close(); + } + + private String getDocumentJson(SourceRecord record, String field) { + return ((Struct) record.value()).getString(field); + } + + + private SourceRecord createSourceRecord(Document obj) throws InterruptedException { + CollectionId collectionId = new CollectionId("rs0", "dbA", "c1"); + BsonTimestamp ts = new BsonTimestamp(1000, 1); + + Document event = new Document().append("o", obj) + .append("ns", "dbA.c1") + .append("ts", ts) + .append("h", Long.valueOf(12345678)) + .append("op", "i"); + RecordsForCollection records = recordMakers.forCollection(collectionId); + records.recordEvent(event, 1002); + return produced.get(produced.size()-1); + } +}