diff --git a/README.md b/README.md index 029e4eb..e77ade6 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ The Amazon Kinesis Storm spout fetches data records from Amazon Kinesis and emit The Amazon Kinesis Storm spout can be configured to retry failed records. By default, it retries a failed record 3 times. If a record fails and the retry limit has been reached, the spout will log an error and skip over the record. The spout buffers pending records in memory, so it can re-emit a failed record without having to re-fetch the record from Amazon Kinesis. The spout sets the checkpoint to the highest sequence number that has been ack'ed (or exhausted retry attempts). -To use the spout, you'll need to add it to your Storm topology. +To use the spout, you'll need to add it to your Storm topology. + **KinesisSpout**: Constructs an instance of the spout, using your AWS credentials and the configuration specified in KinesisSpoutConfig (as well as com.amazonaws.ClientConfiguration, via the AWS SDK). Each task executed by the spout operates on a distinct set of Amazon Kinesis shards. Shard states are periodically committed to ZooKeeper. When the spout is deactivated, it will disconnect from ZooKeeper, but the spout will continue monitoring its local state so you can activate it again later. + **KinesisSpoutConfig**: Configures the spout, including the Storm topology name, the Amazon Kinesis stream name, the endpoint for connecting to ZooKeeper, and the prefix for the ZooKeeper paths where the spout state is stored. See the samples folder for configuration examples. @@ -28,7 +28,7 @@ The samples folder includes a sample topology and sample bolt, using the number 1. Edit the *.properties file to configure your Storm topology, Amazon Kinesis stream, and ZooKeeper details. For your AWS Credentials, we recommend using IAM roles on Amazon EC2 when possible. You can also specify your credentials using system properties, environment variables, or AwsCredentials.properties. 2. Package the spout and the sample (including all dependencies but excluding Storm itself) into one JAR file. -3. Deploy the package to Storm via the JAR file, e.g., `storm jar my-spout-sample.jar SampleTopology sample.properties RemoteMode` +3. Deploy the package to Storm via the JAR file, e.g., `storm jar my-spout-sample.jar SampleTopology sample.properties RemoteMode` ## Release Notes ### Release 1.1 (October 21, 2014) @@ -36,10 +36,6 @@ The samples folder includes a sample topology and sample bolt, using the number + Added region name support -### Future Work - -+ Handle closed, split, and merged shards - ## Related Resources [Amazon Kinesis Developer Guide](http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html) diff --git a/pom.xml b/pom.xml index 5caad43..164a39d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ kinesis-storm-spout jar Amazon Kinesis Storm Spout for Java - 1.1.0 + 1.2.0 The Amazon Kinesis Storm Spout helps Java developers integrate Amazon Kinesis with Storm. https://aws.amazon.com/kinesis @@ -30,8 +30,8 @@ 13.0-final 3.0 - - + + com.amazonaws diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java index 86c1708..910715e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java @@ -72,7 +72,7 @@ public Records getNext(int maxNumberOfRecords) { ensureBuffered(); if (!it.hasNext() && buffer.isEndOfShard()) { - return new Records(ImmutableList. of(), true); + return new Records(ImmutableList. of(), true, buffer.isReshard()); } ImmutableList.Builder recs = new ImmutableList.Builder<>(); @@ -94,7 +94,7 @@ public Records getNext(int maxNumberOfRecords) { } } - return new Records(recs.build(), false); + return new Records(recs.build(), false, buffer.isReshard()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java index 14f133d..10826ae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java @@ -151,7 +151,8 @@ private String addTruncatedShardList(final Map spoutShards, f for (Shard s : streamShards) { currShard = s.getShardId(); - spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId())); + final boolean open = s.getSequenceNumberRange().getEndingSequenceNumber() == null; + spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId(), open)); if (s.getParentShardId() != null && s.getAdjacentParentShardId() != null) { // It's a merge. Set both parents of the merge to merge into this shard. diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java index 48f2988..75a11ef 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java @@ -49,7 +49,7 @@ class KinesisShardGetter implements IShardGetter { private static final long BACKOFF_MILLIS = 500L; private final String streamName; - private final String shardId; + private final ShardInfo shard; private final AmazonKinesisClient kinesisClient; private String shardIterator; @@ -57,12 +57,12 @@ class KinesisShardGetter implements IShardGetter { /** * @param streamName Name of the Kinesis stream - * @param shardId Fetch data from this shard + * @param shard Fetch data from this shard * @param kinesisClient Kinesis client to use when making requests. */ - KinesisShardGetter(final String streamName, final String shardId, final AmazonKinesisClient kinesisClient) { + KinesisShardGetter(final String streamName, final ShardInfo shard, final AmazonKinesisClient kinesisClient) { this.streamName = streamName; - this.shardId = shardId; + this.shard = shard; this.kinesisClient = kinesisClient; this.shardIterator = ""; this.positionInShard = ShardPosition.end(); @@ -72,12 +72,12 @@ class KinesisShardGetter implements IShardGetter { public Records getNext(int maxNumberOfRecords) throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { if (shardIterator == null) { - LOG.debug(this + " Null shardIterator for " + shardId + ". This can happen if shard is closed."); + LOG.debug(this + " Null shardIterator for " + shard.getShardId() + ". This can happen if shard is closed."); return Records.empty(true); } - final ImmutableList.Builder records = new ImmutableList.Builder<>(); - + final ImmutableList.Builder records = new ImmutableList.Builder(); + try { final GetRecordsRequest request = new GetRecordsRequest(); request.setShardIterator(shardIterator); @@ -93,20 +93,22 @@ public Records getNext(int maxNumberOfRecords) + maxNumberOfRecords + ")."); } - shardIterator = result.getNextShardIterator(); + shardIterator = result.getNextShardIterator(); } catch (AmazonClientException e) { // We'll treat this equivalent to fetching 0 records - the spout drives the retry as part of nextTuple() // We don't sleep here - we can continue processing ack/fail on the spout thread. - LOG.error(this + "Caught exception when fetching records for " + shardId, e); + LOG.error(this + "Caught exception when fetching records for " + shard.getShardId(), e); } - return new Records(records.build(), shardIterator == null); + final boolean endOfShard = shardIterator == null; + final boolean reshard = endOfShard && shard.getShardOpen(); + return new Records(records.build(), endOfShard, reshard); } @Override public void seek(ShardPosition position) throws AmazonClientException, ResourceNotFoundException, InvalidSeekPositionException { - LOG.info("Seeking to " + position); + LOG.info(this + " Seeking to " + position + " for shard " + shard.getShardId()); ShardIteratorType iteratorType; String seqNum = null; @@ -145,12 +147,12 @@ public void seek(ShardPosition position) @Override public String getAssociatedShard() { - return shardId; + return shard.getShardId(); } @Override public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shardId).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shard.getShardId()).toString(); } private String seek(final ShardIteratorType iteratorType, final String seqNum) @@ -158,7 +160,7 @@ private String seek(final ShardIteratorType iteratorType, final String seqNum) final GetShardIteratorRequest request = new GetShardIteratorRequest(); request.setStreamName(streamName); - request.setShardId(shardId); + request.setShardId(shard.getShardId()); request.setShardIteratorType(iteratorType); // SeqNum is only set on {AT, AFTER}_SEQUENCE_NUMBER, so this is safe. diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java index c289644..6f63450 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.stormspout; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; /** * Builds KinesisShardGetters. @@ -31,7 +32,7 @@ class KinesisShardGetterBuilder implements IShardGetterBuilder { /** * Constructor. - * + * * @param streamName Kinesis stream to create the getters in. * @param helper Used to get the AmazonKinesisClient object (used by the getters). */ @@ -47,12 +48,15 @@ public KinesisShardGetterBuilder(final String streamName, @Override public ImmutableList buildGetters(ImmutableList shardAssignment) { - ImmutableList.Builder builder = new ImmutableList.Builder<>(); + ImmutableList.Builder builder = new ImmutableList.Builder(); + ImmutableSortedMap shards = helper.getShardList(); - for (String shard : shardAssignment) { - builder.add(new BufferedGetter(new KinesisShardGetter(streamName, shard, helper.getSharedkinesisClient()), - maxRecordsPerCall, - emptyRecordListBackoffMillis)); + for (String shardId : shardAssignment) { + KinesisShardGetter getter = new KinesisShardGetter( + streamName, + shards.get(shardId), + helper.getSharedkinesisClient()); + builder.add(new BufferedGetter(getter, maxRecordsPerCall, emptyRecordListBackoffMillis)); } return builder.build(); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java index 7130026..066f3da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java @@ -36,35 +36,44 @@ import com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager; import com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; /** * Storm spout for Amazon Kinesis. The spout fetches data from Kinesis and emits a tuple for each data record. - * + * * Note: every spout task handles a distinct set of shards. */ public class KinesisSpout implements IRichSpout, Serializable { private static final long serialVersionUID = 7707829996758189836L; private static final Logger LOG = LoggerFactory.getLogger(KinesisSpout.class); - private final InitialPositionInStream initialPosition; - // Initialized before open + private final InitialPositionInStream initialPosition; + private final KinesisHelper shardListGetter; private final KinesisSpoutConfig config; - private final IShardListGetter shardListGetter; + private final long emptyRecordListSleepTimeMillis = 5L; + private final DeadLetterQueue deadLetterQueue; private final IShardGetterBuilder getterBuilder; - private long emptyRecordListSleepTimeMillis = 5L; // Initialized on open private transient SpoutOutputCollector collector; private transient TopologyContext context; - private transient IKinesisSpoutStateManager stateManager; + private transient ZookeeperStateManager stateManager; private transient long lastCommitTime; /** * Constructs an instance of the spout with just enough data to bootstrap the state from. * Construction done here is common to all spout tasks, whereas the IKinesisSpoutStateManager created * in activate() is task specific. - * + * * @param config Spout configuration. * @param credentialsProvider Used when making requests to Kinesis. * @param clientConfiguration Client configuration used when making calls to Kinesis. @@ -86,20 +95,6 @@ public KinesisSpout(KinesisSpoutConfig config, this.initialPosition = config.getInitialPositionInStream(); } - /** - * @param config Spout configuration. - * @param shardListGetter Used to list the shards in the stream. - * @param getterBuilder Used for creating shard getters for a task. - */ - KinesisSpout(final KinesisSpoutConfig config, - final IShardListGetter shardListGetter, - final IShardGetterBuilder getterBuilder) { - this.config = config; - this.shardListGetter = shardListGetter; - this.getterBuilder = getterBuilder; - this.initialPosition = config.getInitialPositionInStream(); - } - @Override public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext spoutContext, @@ -162,7 +157,7 @@ public void nextTuple() { String currentShardId = getter.getAssociatedShard(); Record rec = null; boolean isRetry = false; - + if (stateManager.shouldRetry(currentShardId)) { rec = stateManager.recordToRetry(currentShardId); if (LOG.isDebugEnabled()) { @@ -171,9 +166,14 @@ public void nextTuple() { } isRetry = true; } else { - final ImmutableList records = getter.getNext(1).getRecords(); - if ((records != null) && (!records.isEmpty())) { - rec = records.get(0); + final Records records = getter.getNext(1); + final ImmutableList recordList = records.getRecords(); + if ((recordList != null) && (!recordList.isEmpty())) { + rec = recordList.get(0); + } + if (records.isReshard()) { + LOG.info(this + " detected reshard event for shard " + currentShardId); + stateManager.handleReshard(); } } @@ -181,11 +181,8 @@ public void nextTuple() { // Copy record (ByteBuffer.duplicate()) so bolts in the same JVM don't affect the object (e.g. retries) Record recordToEmit = copyRecord(rec); List tuple = config.getScheme().deserialize(recordToEmit); - if (LOG.isDebugEnabled()) { - LOG.debug(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard " - + currentShardId + "."); - } - + LOG.info(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard " + + currentShardId + " with data: " + tuple); collector.emit(tuple, MessageIdUtil.constructMessageId(currentShardId, recordToEmit.getSequenceNumber())); stateManager.emit(currentShardId, recordToEmit, isRetry); } else { @@ -212,7 +209,7 @@ public void nextTuple() { /** * Creates a copy of the record so we don't get interference from bolts that execute in the same JVM. * We invoke ByteBuffer.duplicate() so the ByteBuffer state is decoupled. - * + * * @param record Kinesis record * @return Copied record. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java index 331389e..6ac3ad4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java @@ -25,16 +25,19 @@ class Records { private final ImmutableList records; private final boolean endOfShard; + private final boolean reshard; /** * Constructor. * * @param records Kinesis records * @param endOfShard Did we reach the end of the shard? + * @param reshard Has a reshard (shard split or merge) event occurred? */ - Records(final ImmutableList records, final boolean endOfShard) { + Records(final ImmutableList records, final boolean endOfShard, final boolean reshard) { this.records = records; this.endOfShard = endOfShard; + this.reshard = reshard; } /** @@ -49,7 +52,7 @@ static Records empty() { * @return a new empty set of records for an open or closed shard. */ static Records empty(final boolean closed) { - return new Records(ImmutableList. of(), closed); + return new Records(ImmutableList. of(), closed, false); } /** @@ -66,6 +69,13 @@ boolean isEndOfShard() { return endOfShard; } + /** + * @return true if a reshard event was detected. + */ + boolean isReshard() { + return reshard; + } + /** * Does the Records instance contain records? * diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java index c519485..8ecd6ae 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java @@ -26,6 +26,7 @@ */ class ShardInfo { private final String shardId; + private final boolean open; private String mergesInto; private List splitsInto; @@ -34,8 +35,9 @@ class ShardInfo { * * @param shardId the Kinesis shard ID. */ - ShardInfo(String shardId) { + ShardInfo(String shardId, boolean open) { this.shardId = shardId; + this.open = open; this.mergesInto = ""; this.splitsInto = new ArrayList<>(2); } @@ -47,6 +49,11 @@ String getShardId() { return shardId; } + /** + * @return true iff the shard state is OPEN. + */ + boolean getShardOpen() { return open; } + /** * Define what the shard merges into. This is meant to be called at most once (not idempotent). * Cannot be called if addSplitsInto has been called. diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java index 17864bd..bb49a2d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java @@ -32,7 +32,7 @@ * while fetching records from Kinesis in batches. This class computes and tracks the checkpoint sequence number * based on messages that have been acked, failed, or are inflight. * We will checkpoint at the highest sequence number for a record which has been acked or retried up to the retry limit. - * + * * We temporarily buffer records which have not been acked or exhausted retries, so we can re-emit them upon failure * without having to refetch them from Kinesis. * Since acks/fails can arrive in an order different from the order in which the records were @@ -41,13 +41,13 @@ * acks in the order [125, 101, 132]. When we get the ack for 125, we keep keep it around. When we get an ack for 101, * we remove both 101 and 125 and set checkpointSequenceNumber to 125. When we get the ack for 132, we remove it and * set the checkpointSequenceNumber to 132. - * + * * Here's the table with info on processing acks. We maintain an ordered list of records (asc seq num). * In the table below we describe how we process an ack for a ("current") record based on the status of the previous * and next records in the ordered list (none means there is no previous/next record, keep means we don't delete * the current record). - * - * + * + * * +---------+-------------------------------+-------------------------------+-------------------------------+ * |Next---> | | | | * |Previous | acked | pending | none | @@ -61,7 +61,7 @@ * | none | delete, delete next, and set | delete and advance checkpoint | delete and advance checkpoint | * | | checkpoint to seq num of next | to seq num of current record | to seq num of current record | * +---------+-------------------------------+-------------------------------+-------------------------------+ - * + * */ // @formatter:on class InflightRecordTracker { @@ -123,7 +123,7 @@ void onEmit(final Record record, boolean isRetry) { } if (LOG.isDebugEnabled()) { LOG.debug("Shard " + shardId + ": Recorded emit for seq num " + sequenceNumber + ", isRetry = " + isRetry - + ", retryNum = " + retryNum); + + ", retryNum = " + retryNum); } } else { if (LOG.isDebugEnabled()) { @@ -216,9 +216,22 @@ Record recordToRetry() { return recordToRetry; } + Record getInflightRecord(final String sequenceNumber) { + RecordNode node = seqNumToRecordInfoMap.get(sequenceNumber); + LOG.info(this + " getInflightRecord: node = " + node + ", sequenceNumber = " + sequenceNumber + ", shardId = " + shardId + ", checkpointSequenceNumber = " + checkpointSequenceNumber); + Record record = node.getRecord(); + return record; + } + + Record getEarliestInflightRecord() { + if (recordNodeList.size() > 0) + return recordNodeList.getFirst().getRecord(); + return null; + } + /** * Note: This has package level access solely for testing purposes. - * + * * @return List of in-flight records */ RecordNodeList getRecordNodeList() { @@ -227,7 +240,7 @@ RecordNodeList getRecordNodeList() { /** * Note: This method has package level access solely for testing purposes. - * + * * @return SequenceNumber->RecordNode map of in-flight records. */ Map getSequenceNumberToRecordNodeMap() { @@ -236,7 +249,7 @@ Map getSequenceNumberToRecordNodeMap() { /** * Note: This method has package level access solely for testing purposes. - * + * * @return Queue of records to retry */ Queue getRetryQueue() { @@ -330,7 +343,7 @@ class RecordNodeList { /** * Adds record to the end of the list. - * + * * @param record Record will be added to end of the list * @return Newly added node * @throws IllegalArgumentException Thrown if the sequence number of the record is lower than the (current) last diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java index ef1c460..6dacaee 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java @@ -36,7 +36,7 @@ class LocalShardState { /** * Constructor. - * + * * @param shardId ID of the shard this LocalShardState is tracking. * @param latestZookeeperSeqNum the last checkpoint stored in Zookeeper. * @param recordRetryLimit Number of times a failed record should be retried. @@ -67,7 +67,7 @@ void ack(final String seqNum) { tracker.onAck(seqNum); } - /** + /** * Call when a record is failed. It is then added to a retry queue that is queried by * nextTuple(). * @@ -88,6 +88,14 @@ Record recordToRetry() { return tracker.recordToRetry(); } + Record getInflightRecord(final String seqNum) { + return tracker.getInflightRecord(seqNum); + } + + Record getEarliestInflightRecord() { + return tracker.getEarliestInflightRecord(); + } + /** * @return true if there are sequence numbers that need to be retried. */ diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java index a58bf4f..c5edf64 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java @@ -96,9 +96,17 @@ public byte[] initialize() { @Override public Mod apply(byte[] x) { - // At this point, we don't support resharding. We assume the shard list is valid if one exists. - LOG.info("ShardList already initialized in Zookeeper. Assuming it is valid."); - return Mod.noModification(); + // Implies that every spout will attempt to update the ZK shardList when a reshard occurs. + LOG.info(this + " Re-initialization of shardList: " + shards); + ShardListV0 shardList = new ShardListV0(shards); + ObjectMapper objectMapper = new ObjectMapper(); + byte[] data; + try { + data = objectMapper.writeValueAsBytes(shardList); + } catch (JsonProcessingException e) { + throw new KinesisSpoutException("Unable to serialize shardList " + shardList, e); + } + return Mod.modification(data); } }; @@ -235,8 +243,8 @@ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) t if (stat == null) { try { - zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .forPath(path, f.initialize()); + LOG.info("creating ZK data at path " + path); + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, f.initialize()); } catch (KeeperException.NodeExistsException e) { LOG.debug("Concurrent creation of " + path + ", retrying", e); return false; @@ -246,6 +254,7 @@ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) t if (newVal.hasModification()) { try { + LOG.info("updating ZK data at path " + path); zk.setData().withVersion(stat.getVersion()).forPath(path, newVal.get()); } catch (KeeperException.BadVersionException e) { LOG.debug("Concurrent update to " + path + ", retrying.", e); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java index 344d02b..7a0b35b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java @@ -50,17 +50,21 @@ public class ZookeeperStateManager implements Watcher, IKinesisSpoutStateManager { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStateManager.class); + // initialized at construction private final KinesisSpoutConfig config; + private final ShardPosition seekToOnOpen; private final IShardListGetter shardListGetter; private final IShardGetterBuilder getterBuilder; - private final ShardPosition seekToOnOpen; + // updated at activate and deactivate private ZookeeperShardState zk; + private boolean active; + + // updated at rebalance private int taskIndex; private int totalNumTasks; - private boolean active; - private ImmutableList getters; + // initialized at bootstrap private Iterator currentGetter; private Map shardStates; @@ -96,29 +100,34 @@ private ShardPosition getShardPosition(InitialPositionInStream initialPosition) return position; } - /* (non-Javadoc) - * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#activate() + /** + * Ensure that the task can safely be activated + * This will take care of making sure the list is sorted too. */ - @Override - public void activate() { - this.zk = new ZookeeperShardState(config); - this.active = true; - - // Ensure that the task can safely be activated - // This will take care of making sure the list is sorted too. + public void initialize() { ImmutableList shardList = ImmutableList.copyOf(shardListGetter.getShardList().keySet()); LOG.info(this + "Activating with shardList " + shardList); try { zk.initialize(shardList); - // Hook shardList watcher for the first time. zk.watchShardList(this); } catch (Exception e) { LOG.error(this + " something went wrong while initializing Zookeeper shardList." - + " Assuming it is unsafe to continue.", e); + + " Assuming it is unsafe to continue.", e); throw new KinesisSpoutException(e); } } + + /* (non-Javadoc) + * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#activate() + */ + @Override + public void activate() { + this.zk = new ZookeeperShardState(config); + this.active = true; + initialize(); + } + /* (non-Javadoc) * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#deactivate() */ @@ -212,7 +221,7 @@ public void emit(final String shardId, final Record record, boolean isRetry) { public boolean shouldRetry(final String shardId) { return safeGetShardState(shardId).shouldRetry(); } - + /* (non-Javadoc) * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#recordToRetry(java.lang.String) */ @@ -262,6 +271,8 @@ public void commitShardStates() { // They are all synchronized on the instance of this class. @Override public synchronized void process(WatchedEvent event) { + LOG.debug(this + " watched event occurred: " + event); + checkState(active, "Cannot process events if state is not active (a ZK" + " connection is necessary)."); @@ -286,12 +297,18 @@ public synchronized void process(WatchedEvent event) { if (event.getType() == EventType.NodeDataChanged && zk.isShardList(event.getPath())) { LOG.info(this + " detected change in shardList. Committing current shard state and " + "reinitializing spout task from ZK."); - commitShardStates(); bootstrapStateFromZookeeper(); } } + /** + * Will re-initialize the shard list, causing the watcher to trigger and detect the change in the shard list. + */ + public void handleReshard() { + initialize(); + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) @@ -303,37 +320,59 @@ public String toString() { // and the getters from whatever data is in Zookeeper. private void bootstrapStateFromZookeeper() { ImmutableList shardAssignment = getShardAssignment(); + ImmutableList getters; // Task could not get an assignment (e.g. there are too many tasks for too few shards). if (shardAssignment.isEmpty()) { - this.shardStates = new HashMap<>(); - this.getters = ImmutableList.of(); + shardStates = new HashMap(); + getters = ImmutableList.of(); } else { - this.shardStates = makeLocalState(shardAssignment); - this.getters = makeGetters(shardAssignment); + updateLocalState(shardAssignment); + getters = makeGetters(shardAssignment); } this.currentGetter = Iterators.cycle(getters); LOG.info(this + " got getter assignment. Handling " + getters + "."); } - // Create the local shard state from Zookeeper. - private Map makeLocalState(ImmutableList shardAssignment) { - Map state = new HashMap<>(); + // Create/update the local shard state from Zookeeper. + private void updateLocalState(ImmutableList shardAssignment) { + // first initialization of shardStates + if (shardStates == null) { + shardStates = new HashMap(); + } + + // remove shard state that we're not longer responsible for + Iterator> iter = shardStates.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + final String shardId = entry.getKey(); + if (!shardAssignment.contains(shardId)) { + LOG.info(this + " removing stale shard state for shard " + shardId); + iter.remove(); + } + } for (final String shardId : shardAssignment) { + // first check for pre-existing shard state... + if (shardStates.containsKey(shardId)) { + final LocalShardState st = shardStates.get(shardId); + LOG.info(this + " keeping existing shard state for shard " + shardId + " with earliest inflight seqnum " + st.getEarliestInflightRecord()); + continue; + } + + // otherwise fallback to zk committed shard state String latestValidSeqNum; try { latestValidSeqNum = zk.getLastCommittedSeqNum(shardId); + LOG.info(this + " fell back to zk record with seqnum (" + latestValidSeqNum + ") for shard " + shardId); } catch (Exception e) { - LOG.error(this + " could not retrieve last committed seqnum for " + shardId - + " from ZooKeeper. Starting from default getter position."); latestValidSeqNum = ""; + LOG.error(this + " could not retrieve last committed seqnum for " + shardId + + " from ZooKeeper. Starting from default getter position."); } - state.put(shardId, new LocalShardState(shardId, latestValidSeqNum, config.getRecordRetryLimit())); + shardStates.put(shardId, new LocalShardState(shardId, latestValidSeqNum, config.getRecordRetryLimit())); } - - return state; } // Opens getters based on shard assignment and local shard state, and seeks them to seekToOnOpen. @@ -348,11 +387,24 @@ private ImmutableList makeGetters(ImmutableList shardAssig final LocalShardState shardState = safeGetShardState(shardId); try { - if (shardState.getLatestValidSeqNum().isEmpty() && seekToOnOpen != null) { - getter.seek(seekToOnOpen); + if (shardState.getEarliestInflightRecord() != null) { + // seek getter to earliest inflight record + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to early inflight " + shardState.getEarliestInflightRecord().getSequenceNumber()); + } + getter.seek(ShardPosition.afterSequenceNumber(shardState.getEarliestInflightRecord().getSequenceNumber())); } else if (!shardState.getLatestValidSeqNum().isEmpty()) { - getter.seek(ShardPosition.afterSequenceNumber( - shardState.getLatestValidSeqNum())); + // we fell back to zk state, seek shard state to appropriate position + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to committed " + shardState.getLatestValidSeqNum()); + } + getter.seek(ShardPosition.afterSequenceNumber(shardState.getLatestValidSeqNum())); + } else if (seekToOnOpen != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to default " + seekToOnOpen); + } + // no pre-existing shard state or valid zk state, seek to default + getter.seek(seekToOnOpen); } } catch (InvalidSeekPositionException e) { LOG.error(this + " tried to seek getter " + getter + " to an invalid position.", e);