From a3c68a2be69932fbb68cd3f94afb3a5b56d8769f Mon Sep 17 00:00:00 2001 From: Sergey Avseyev Date: Sat, 24 Sep 2016 05:14:46 +0300 Subject: [PATCH] KAFKAC-47: maintain replication state and recover it after disruption --- README.md | 2 +- pom.xml | 4 +- .../connect/kafka/CouchbaseMonitorThread.java | 37 ++++++++++---- .../connect/kafka/CouchbaseSourceTask.java | 48 +++++++++++++++---- 4 files changed, 70 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index ba6a784..9956641 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ Start the Schema Registry, also in its own terminal. Now, run the connector in a standalone Kafka Connect worker in another terminal (this assumes Avro settings and that Kafka and the Schema Registry are running locally on the default ports): - $ sudo connect-standalone /etc/schema-registry/connect-avro-standalone.properties \ + $ sudo connect-standalone /etc/kafka/connect-standalone.properties \ /etc/kafka-connect-couchbase/quickstart-couchbase.properties To observe replicated events from the cluster, run CLI kafka consumer: diff --git a/pom.xml b/pom.xml index 79a14c3..2cfb118 100644 --- a/pom.xml +++ b/pom.xml @@ -52,9 +52,9 @@ 1.6 3.0.0 - 0.10.0.0-cp1 + 0.10.0.1 4.12 - 0.1.0 + 0.3.0 UTF-8 http://packages.confluent.io/maven/ diff --git a/src/main/java/com/couchbase/connect/kafka/CouchbaseMonitorThread.java b/src/main/java/com/couchbase/connect/kafka/CouchbaseMonitorThread.java index baca627..2b9d8b9 100644 --- a/src/main/java/com/couchbase/connect/kafka/CouchbaseMonitorThread.java +++ b/src/main/java/com/couchbase/connect/kafka/CouchbaseMonitorThread.java @@ -19,14 +19,18 @@ import com.couchbase.client.dcp.Client; import com.couchbase.client.dcp.ControlEventHandler; import com.couchbase.client.dcp.DataEventHandler; +import com.couchbase.client.dcp.StreamFrom; +import com.couchbase.client.dcp.StreamTo; import com.couchbase.client.dcp.config.DcpControl; +import com.couchbase.client.dcp.message.DcpFailoverLogResponse; import com.couchbase.client.dcp.message.DcpSnapshotMarkerMessage; -import com.couchbase.client.dcp.message.MessageUtil; +import com.couchbase.client.dcp.state.PartitionState; +import com.couchbase.client.dcp.state.SessionState; +import com.couchbase.client.dcp.state.StateFormat; import com.couchbase.client.deps.io.netty.buffer.ByteBuf; -import org.apache.kafka.common.config.types.Password; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import rx.Subscription; +import rx.functions.Action1; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -36,13 +40,14 @@ public class CouchbaseMonitorThread extends Thread { private final long connectionTimeout; private final Client client; - private final Integer[] partitions; - private Subscription subscription; + private final Short[] partitions; + private final SessionState initialSessionState; public CouchbaseMonitorThread(List clusterAddress, String bucket, String password, long connectionTimeout, - final BlockingQueue queue, Integer[] partitions) { + final BlockingQueue queue, Short[] partitions, SessionState sessionState) { this.connectionTimeout = connectionTimeout; this.partitions = partitions; + this.initialSessionState = sessionState; client = Client.configure() .hostnames(clusterAddress) .bucket(bucket) @@ -78,12 +83,26 @@ public void acknowledgeBuffer(ByteBuf event) { @Override public void run() { client.connect().await(); // FIXME: uncomment and raise timeout exception: .await(connectionTimeout, TimeUnit.MILLISECONDS); - client.initializeFromBeginningToNoEnd().await(); - subscription = client.startStreams(partitions).subscribe(); + client.initializeState(StreamFrom.BEGINNING, StreamTo.INFINITY).await(); + client.failoverLogs(partitions).forEach(new Action1() { + @Override + public void call(ByteBuf event) { + short partition = DcpFailoverLogResponse.vbucket(event); + int numEntries = DcpFailoverLogResponse.numLogEntries(event); + PartitionState ps = initialSessionState.get(partition); + for (int i = 0; i < numEntries; i++) { + ps.addToFailoverLog( + DcpFailoverLogResponse.seqnoEntry(event, i), + DcpFailoverLogResponse.vbuuidEntry(event, i) + ); + } + client.sessionState().set(partition, ps); + } + }); + client.startStreaming(partitions).await(); } public void shutdown() { - subscription.unsubscribe(); client.disconnect().await(); } } diff --git a/src/main/java/com/couchbase/connect/kafka/CouchbaseSourceTask.java b/src/main/java/com/couchbase/connect/kafka/CouchbaseSourceTask.java index cecceb4..71989f4 100644 --- a/src/main/java/com/couchbase/connect/kafka/CouchbaseSourceTask.java +++ b/src/main/java/com/couchbase/connect/kafka/CouchbaseSourceTask.java @@ -19,6 +19,9 @@ import com.couchbase.client.dcp.message.DcpDeletionMessage; import com.couchbase.client.dcp.message.DcpExpirationMessage; import com.couchbase.client.dcp.message.DcpMutationMessage; +import com.couchbase.client.dcp.state.PartitionState; +import com.couchbase.client.dcp.state.SessionState; +import com.couchbase.client.dcp.state.StateFormat; import com.couchbase.client.deps.io.netty.buffer.ByteBuf; import com.couchbase.client.deps.io.netty.util.CharsetUtil; import com.couchbase.connect.kafka.dcp.EventType; @@ -53,6 +56,7 @@ public class CouchbaseSourceTask extends SourceTask { private BlockingQueue queue; private String topic; private String bucket; + private volatile boolean running; @Override public String version() { @@ -75,13 +79,37 @@ public void start(Map properties) { long connectionTimeout = config.getLong(CouchbaseSourceConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG); List partitionsList = config.getList(CouchbaseSourceTaskConfig.PARTITIONS_CONFIG); - Integer[] partitions = new Integer[partitionsList.size()]; + + Short[] partitions = new Short[partitionsList.size()]; + List> kafkaPartitions = new ArrayList>(1); for (int i = 0; i < partitionsList.size(); i++) { - partitions[i] = Integer.parseInt(partitionsList.get(i)); + partitions[i] = Short.parseShort(partitionsList.get(i)); + Map kafkaPartition = new HashMap(2); + kafkaPartition.put("bucket", bucket); + kafkaPartition.put("partition", partitions[i].toString()); + kafkaPartitions.add(kafkaPartition); + } + Map, Map> offsets = context.offsetStorageReader().offsets(kafkaPartitions); + SessionState sessionState = new SessionState(); + sessionState.setToBeginningWithNoEnd(1024); // FIXME: literal + for (Map kafkaPartition : kafkaPartitions) { + Map offset = offsets.get(kafkaPartition); + Short partition = Short.parseShort(kafkaPartition.get("partition")); + PartitionState partitionState = sessionState.get(partition); + long startSeqno = 0; + if (offset != null && offset.containsKey("bySeqno")) { + startSeqno = (Long) offset.get("bySeqno"); + } + partitionState.setStartSeqno(startSeqno); + partitionState.setEndSeqno(0xffffffff); + partitionState.setSnapshotStartSeqno(startSeqno); + partitionState.setSnapshotEndSeqno(startSeqno); + sessionState.set(partition, partitionState); } + running = true; queue = new LinkedBlockingQueue(); - couchbaseMonitorThread = new CouchbaseMonitorThread(clusterAddress, bucket, password, connectionTimeout, queue, partitions); + couchbaseMonitorThread = new CouchbaseMonitorThread(clusterAddress, bucket, password, connectionTimeout, queue, partitions, sessionState); couchbaseMonitorThread.start(); } @@ -99,7 +127,7 @@ private static List getList(CouchbaseSourceConnectorConfig config, Strin public List poll() throws InterruptedException { List results = new ArrayList(); - while (true) { + while (running) { ByteBuf event = queue.poll(100, TimeUnit.MILLISECONDS); if (event != null) { SourceRecord record = convert(event); @@ -113,6 +141,7 @@ public List poll() throws InterruptedException { return results; } } + return results; } public SourceRecord convert(ByteBuf event) { @@ -138,16 +167,16 @@ public SourceRecord convert(ByteBuf event) { record.put("revSeqno", DcpDeletionMessage.revisionSeqno(event)); } else if (DcpExpirationMessage.is(event)) { record.put("partition", DcpExpirationMessage.partition(event)); - // FIXME: uncomment in next version - // record.put("key", bufToString(DcpExpirationMessage.key(event))); - // record.put("cas", DcpExpirationMessage.cas(event)); + record.put("key", bufToString(DcpExpirationMessage.key(event))); + record.put("cas", DcpExpirationMessage.cas(event)); record.put("bySeqno", DcpExpirationMessage.bySeqno(event)); record.put("revSeqno", DcpExpirationMessage.revisionSeqno(event)); } final Map offset = new HashMap(2); - offset.put("partition", record.getInt16("partition")); offset.put("bySeqno", record.getInt64("bySeqno")); - final Map partition = Collections.singletonMap("bucket", bucket); + final Map partition = new HashMap(2); + partition.put("bucket", bucket); + partition.put("partition", record.getInt16("partition").toString()); return new SourceRecord(partition, offset, topic, schema, record); } @@ -156,6 +185,7 @@ public SourceRecord convert(ByteBuf event) { @Override public void stop() { + running = false; couchbaseMonitorThread.shutdown(); try { couchbaseMonitorThread.join(MAX_TIMEOUT);