Skip to content

Commit

Permalink
Merge pull request #4 from trustpilot/filter-fields-smt
Browse files Browse the repository at this point in the history
filter fields SMT
  • Loading branch information
raimondast authored Feb 20, 2018
2 parents b8f5e84 + d543e9c commit 578507a
Show file tree
Hide file tree
Showing 2 changed files with 352 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.debezium.connector.mongodb.transforms;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;

import java.util.*;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;


/**
* SMT to filter MongoDB envelope content.
*
* Usage example:
*
* ...
* "transforms": "filterfields",
* "transforms.filterfields.type": "io.debezium.connector.mongodb.transforms.FilterFieldsFromMongoDbEnvelope",
* "transforms.filterfields.whitelist": "a,nested.p1,array,array[].p1",
* "transforms.filterfields.blacklist": "b"
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Raimondas Tijunaitis
*/

public class FilterFieldsFromMongoDbEnvelope<R extends ConnectRecord<R>> implements Transformation<R> {

private static final JsonWriterSettings WRITER_SETTINGS = new JsonWriterSettings(JsonMode.STRICT, "", "");

public static final String OVERVIEW_DOC = "Filter fields from MongoDB envelope in 'patch' and 'after' fields.";

interface ConfigName {
String BLACKLIST = "blacklist";
String WHITELIST = "whitelist";
}

private List<String> blacklist;
private List<String> whitelist;


public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to exclude. This takes precedence over the whitelist.")
.define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
"Fields to include. If specified, only these fields will be used.");

@Override
public R apply(R r) {
Struct doc = (Struct) r.value();

if (doc.get("after") != null) {
filterFields(doc, "after");
}

if (doc.get("patch") != null) {
filterFields(doc, "patch");
}

return r;
}

private void filterFields(Struct doc, String docName) {
BsonDocument val = BsonDocument.parse(doc.getString(docName));
Set<Map.Entry<String, BsonValue>> set = val.entrySet();
filterFields(set, "");

doc.put(docName, val.toJson(WRITER_SETTINGS));
}

private void filterFields(Set<Map.Entry<String, BsonValue>> set, String path) {
for (Iterator<Map.Entry<String, BsonValue>> iterator = set.iterator(); iterator.hasNext(); ) {
Map.Entry<String, BsonValue> item = iterator.next();

if (item.getValue().getClass().equals(BsonDocument.class)) {
filterFields(((BsonDocument)item.getValue()).entrySet(), path + item.getKey() + ".");
}

if (item.getValue().getClass().equals(BsonArray.class)) {
for (Iterator<BsonValue> iterator2 = ((BsonArray) item.getValue()).iterator(); iterator2.hasNext(); ) {
BsonValue element = iterator2.next();
if (element.getClass().equals(BsonDocument.class)) {
filterFields(((BsonDocument) element).entrySet(), path + item.getKey() + "[].");
}
}
}

if (!filter(path + item.getKey())) {
iterator.remove();
}
}
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> map) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);
blacklist = config.getList(ConfigName.BLACKLIST);
whitelist = config.getList(ConfigName.WHITELIST);
}

boolean filter(String fieldName) {
return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.transforms;

import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.RecordMakers;
import io.debezium.connector.mongodb.RecordMakers.RecordsForCollection;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.connector.mongodb.TopicSelector;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.print.Doc;
import java.util.*;

import static org.fest.assertions.Assertions.assertThat;

/**
* Unit test for {@link FilterFieldsFromMongoDbEnvelope}. It uses {@link RecordMakers}
* to assemble source records as the connector would emit them and feeds them to
* the SMT.
*
* @author Raimondas Tijunaitis
*/
public class FilterFieldsFromMongoDbEnvelopeTest {

private static final String SERVER_NAME = "serverX.";
private static final String PREFIX = SERVER_NAME + ".";

private SourceInfo source;
private RecordMakers recordMakers;
private TopicSelector topicSelector;
private List<SourceRecord> produced;

private FilterFieldsFromMongoDbEnvelope<SourceRecord> transformation;

@Before
public void setup() {
source = new SourceInfo(SERVER_NAME);
topicSelector = TopicSelector.defaultSelector(PREFIX);
produced = new ArrayList<>();
recordMakers = new RecordMakers(source, topicSelector, produced::add);
transformation = new FilterFieldsFromMongoDbEnvelope();
}

@After
public void closeSmt() {
transformation.close();
}

@Test
public void shouldTransformRecordRemovingBlacklistedFieldForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6));

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1));

Map<String, String> cfg = new HashMap();
cfg.put("blacklist", "nest");

compareEvents(after, before, cfg, "after");
}

@Test
public void shouldTransformRecordRemovingBlacklistedNestedFieldForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p2", 2));

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1).append("p2", 2));

Map<String, String> cfg = new HashMap();
cfg.put("blacklist", "nest.p1");

compareEvents(after, before, cfg, "after");
}

@Test
public void shouldTransformRecordRemovingBlacklistedAllNestedFieldForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document());

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1).append("p2", 2));

Map<String, String> cfg = new HashMap();
cfg.put("blacklist", "nest.p1,nest.p2");

compareEvents(after, before, cfg, "after");
}

@Test
public void shouldTransformRecordRemovingBlacklistedMultiNestedFieldForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1).append("p2", new Document().append("pp2", 2)));

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1).append("p2", new Document().append("pp1", 1).append("pp2", 2)));

Map<String, String> cfg = new HashMap();
cfg.put("blacklist", "nest.p2.pp1");

compareEvents(after, before, cfg, "after");
}

@Test
public void shouldTransformRecordSelectingWhitelistedFieldOnlyForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally");

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("nest", new Document().append("p1", 1));

Map<String, String> cfg = new HashMap();
cfg.put("whitelist", "_id,name");

compareEvents(after, before, cfg, "after");
}

@Test
public void shouldTransformRecordSelectingWhitelistedFieldsFromArraysForInsertEvent() throws InterruptedException {
ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("events", Arrays.asList(
new Document().append("e1", "1"),
new Document().append("e1", "12")));

Document before = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6))
.append("events", Arrays.asList(
new Document().append("e1", "1").append("e2", "2"),
new Document().append("e1", "12").append("e2", "22")));

Map<String, String> cfg = new HashMap();
cfg.put("whitelist", "_id,name,events,events[].e1");

compareEvents(after, before, cfg, "after");
}

private void compareEvents(Document doc1, Document doc2, Map<String, String> cfg, String docName) throws InterruptedException {

SourceRecord recordOrig = createSourceRecord(doc1);
SourceRecord record = createSourceRecord(doc2);

// when
transformation.configure(cfg);

SourceRecord transformed = transformation.apply(record);
String json1 = getDocumentJson(recordOrig, docName);
String json2 = getDocumentJson(transformed, docName);
assertThat(json1).isNotNull().isNotEmpty();
assertThat(json2).isNotNull().isNotEmpty();
assertThat(json2).isEqualTo(json1);

transformation.close();
}

private String getDocumentJson(SourceRecord record, String field) {
return ((Struct) record.value()).getString(field);
}


private SourceRecord createSourceRecord(Document obj) throws InterruptedException {
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
BsonTimestamp ts = new BsonTimestamp(1000, 1);

Document event = new Document().append("o", obj)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "i");
RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
return produced.get(produced.size()-1);
}
}

0 comments on commit 578507a

Please sign in to comment.