Skip to content

Commit

Permalink
Merge pull request #5 from trustpilot/fallback-to-legacy-keys-smt
Browse files Browse the repository at this point in the history
Fallback to legacy keys smt
  • Loading branch information
raimondast authored Feb 23, 2018
2 parents 578507a + 3e6f1f1 commit 500b586
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.debezium.connector.mongodb.transforms;

import com.mongodb.util.JSON;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.bson.types.ObjectId;

import java.util.Map;

import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

/**
* SMT to fallback to legacy debezium MongoDB key format that was deprecated (http://debezium.io/docs/releases/#release-0-6-0).
* Use it to prevent consumers to break after debezium mongodb connector upgrade
*
* Usage example:
*
* ...
* "transforms": "fallback_keys",
* "transforms.fallback_keys.type": "io.debezium.connector.mongodb.transforms.FallbackToLegacyMessageKeyFormat",
*
* @param <R> the subtype of {@link ConnectRecord} on which this transformation will operate
* @author Raimondas Tijunaitis
*/

public class FallbackToLegacyMessageKeyFormat<R extends ConnectRecord<R>> implements Transformation<R> {

private static final String PURPOSE = "field replacement";
private Cache<Schema, Schema> schemaUpdateCache;

@Override
public R apply(R record) {
final Struct value = requireStruct(record.key(), PURPOSE);

Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}

Field field = updatedSchema.field("_id");
final Struct updatedValue = new Struct(updatedSchema);
final String fieldValue = value.getString("id");

updatedValue.put(field.name(), objectIdLiteral(JSON.parse(fieldValue)));

return newRecord(record, updatedSchema, updatedValue);
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
schema.field("id");
builder.field("_id", schema.field("id").schema());
return builder.build();
}

protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp());
}

protected String objectIdLiteral(Object id) {
if (id == null) {
return null;
}
if (id instanceof ObjectId) {
return ((ObjectId) id).toHexString();
}
if (id instanceof String) {
return (String) id;
}
return id.toString();
}

@Override
public ConfigDef config() {
return new ConfigDef();
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> map) {
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<Schema, Schema>(16));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package io.debezium.connector.mongodb.transforms;

import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.RecordMakers;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.connector.mongodb.TopicSelector;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;

import static org.fest.assertions.Assertions.assertThat;

/**
* Unit test for {@link FallbackToLegacyMessageKeyFormat}. It uses {@link RecordMakers}
* to assemble source records as the connector would emit them and feeds them to
* the SMT.
*
* @author Raimondas Tijunaitis
*/

public class FallbackToLegacyMessageKeyFormatTest {
private static final String SERVER_NAME = "serverX.";
private static final String PREFIX = SERVER_NAME + ".";

private SourceInfo source;
private RecordMakers recordMakers;
private TopicSelector topicSelector;
private List<SourceRecord> produced;

private FallbackToLegacyMessageKeyFormat<SourceRecord> transformation;

@Before
public void setup() {
source = new SourceInfo(SERVER_NAME);
topicSelector = TopicSelector.defaultSelector(PREFIX);
produced = new ArrayList<>();
recordMakers = new RecordMakers(source, topicSelector, produced::add);
transformation = new FallbackToLegacyMessageKeyFormat();
}

@After
public void closeSmt() {
transformation.close();
}

@Test
public void shouldMapKeyToLegacyDebeziumStructureTest() throws InterruptedException {

ObjectId id = new ObjectId();
Document after = new Document().append("_id", id)
.append("name", "Sally")
.append("phone", 123L)
.append("active", true)
.append("scores", Arrays.asList(1.2, 3.4, 5.6));

SourceRecord record = createSourceRecord(after);

assertThat(((Struct)record.key()).get("id")).isNotNull();
assertThat(((Struct)record.key()).getString("id")).isNotEqualTo(id.toString());

// when
Map<String, String> cfg = new HashMap();

transformation.configure(cfg);

SourceRecord transformed = transformation.apply(record);
assertThat(((Struct)transformed.key()).get("_id")).isNotNull();
assertThat(((Struct)transformed.key()).getString("_id")).isEqualTo(id.toString());
transformation.close();
}


private SourceRecord createSourceRecord(Document after) throws InterruptedException {
CollectionId collectionId = new CollectionId("rs0", "dbA", "c1");
BsonTimestamp ts = new BsonTimestamp(1000, 1);

Document event = new Document().append("o", after)
.append("ns", "dbA.c1")
.append("ts", ts)
.append("h", Long.valueOf(12345678))
.append("op", "i");
RecordMakers.RecordsForCollection records = recordMakers.forCollection(collectionId);
records.recordEvent(event, 1002);
return produced.get(produced.size()-1);
}

}

0 comments on commit 500b586

Please sign in to comment.