diff --git a/.gitignore b/.gitignore index ffa4e66..612013c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ target/ AwsCredentials.properties +.idea +**.iml \ No newline at end of file diff --git a/pom.xml b/pom.xml index c9cd980..74bbfc4 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ kinesis-storm-spout jar Amazon Kinesis Storm Spout for Java - 1.1.1 + 1.2.1-SNAPSHOT The Amazon Kinesis Storm Spout helps Java developers integrate Amazon Kinesis with Storm. https://aws.amazon.com/kinesis @@ -24,11 +24,14 @@ - 1.7.13 - 0.9.2-incubating - 1.1.3 - 13.0-final - 3.0 + UTF-8 + + 1.11.50 + 1.0.2 + 3.2.0 + 20.0 + 3.5 + 2.24.0 @@ -55,10 +58,15 @@ ${storm.version} - com.netflix.curator + org.apache.curator curator-framework ${curator-framework.version} + + com.esotericsoftware.kryo + kryo + ${kryo.version} + diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java index 86c1708..80d6d13 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/BufferedGetter.java @@ -15,14 +15,13 @@ package com.amazonaws.services.kinesis.stormspout; -import java.util.Iterator; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; - import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.stormspout.exceptions.InvalidSeekPositionException; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + +import java.util.Iterator; /** * Allows users to do efficient getter.getNext(1) calls in exchange for maybe pulling @@ -40,27 +39,27 @@ class BufferedGetter implements IShardGetter { /** * Creates a (shard) getter that buffers records. - * - * @param underlyingGetter Unbuffered shard getter. - * @param maxBufferSize Max number of records to fetch from the underlying getter. + * + * @param underlyingGetter Unbuffered shard getter. + * @param maxBufferSize Max number of records to fetch from the underlying getter. * @param emptyRecordListBackoffMillis Backoff time between GetRecords calls if previous call fetched no records. */ public BufferedGetter(final IShardGetter underlyingGetter, final int maxBufferSize, final long emptyRecordListBackoffMillis) { this(underlyingGetter, maxBufferSize, emptyRecordListBackoffMillis, new TimeProvider()); } - + /** * Used for unit testing. - * - * @param underlyingGetter Unbuffered shard getter - * @param maxBufferSize Max number of records to fetch from the underlying getter + * + * @param underlyingGetter Unbuffered shard getter + * @param maxBufferSize Max number of records to fetch from the underlying getter * @param emptyRecordListBackoffMillis Backoff time between GetRecords calls if previous call fetched no records. - * @param timeProvider Useful for testing timing based behavior (e.g. backoff) + * @param timeProvider Useful for testing timing based behavior (e.g. backoff) */ BufferedGetter(final IShardGetter underlyingGetter, - final int maxBufferSize, - final long emptyRecordListBackoffMillis, - final TimeProvider timeProvider) { + final int maxBufferSize, + final long emptyRecordListBackoffMillis, + final TimeProvider timeProvider) { this.getter = underlyingGetter; this.maxBufferSize = maxBufferSize; this.emptyRecordListBackoffTime = emptyRecordListBackoffMillis; @@ -72,7 +71,7 @@ public Records getNext(int maxNumberOfRecords) { ensureBuffered(); if (!it.hasNext() && buffer.isEndOfShard()) { - return new Records(ImmutableList. of(), true); + return new Records(ImmutableList.of(), true, buffer.isReshard()); } ImmutableList.Builder recs = new ImmutableList.Builder<>(); @@ -94,7 +93,7 @@ public Records getNext(int maxNumberOfRecords) { } } - return new Records(recs.build(), false); + return new Records(recs.build(), false, buffer.isReshard()); } @Override @@ -132,8 +131,8 @@ private void rebuffer() { } } } - - /** + + /** * Time provider - helpful for unit tests of BufferedGetter. */ static class TimeProvider { diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/DefaultKinesisRecordScheme.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/DefaultKinesisRecordScheme.java index 79d8309..727f646 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/DefaultKinesisRecordScheme.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/DefaultKinesisRecordScheme.java @@ -15,13 +15,12 @@ package com.amazonaws.services.kinesis.stormspout; +import com.amazonaws.services.kinesis.model.Record; +import org.apache.storm.tuple.Fields; + import java.util.ArrayList; import java.util.List; -import backtype.storm.tuple.Fields; - -import com.amazonaws.services.kinesis.model.Record; - /** * Default scheme for emitting Kinesis records as tuples. It emits a tuple of (partitionKey, record). */ diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/IKinesisRecordScheme.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/IKinesisRecordScheme.java index 5c39278..e85f9f7 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/IKinesisRecordScheme.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/IKinesisRecordScheme.java @@ -15,11 +15,10 @@ package com.amazonaws.services.kinesis.stormspout; -import java.util.List; - -import backtype.storm.tuple.Fields; - import com.amazonaws.services.kinesis.model.Record; +import org.apache.storm.tuple.Fields; + +import java.util.List; /** * Used to convert Kinesis record into a tuple. diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java index cea9395..dd720d2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisHelper.java @@ -15,19 +15,10 @@ package com.amazonaws.services.kinesis.stormspout; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.amazonaws.AmazonClientException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -35,6 +26,13 @@ import com.amazonaws.services.kinesis.stormspout.utils.InfiniteConstantBackoffRetry; import com.amazonaws.services.kinesis.stormspout.utils.ShardIdComparator; import com.google.common.collect.ImmutableSortedMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; /** * Helper class to fetch the shard list from Kinesis, create Kinesis client objects, etc. @@ -58,14 +56,14 @@ class KinesisHelper implements IShardListGetter { private transient Region region; /** - * @param streamName Kinesis stream name to interact with. + * @param streamName Kinesis stream name to interact with. * @param kinesisCredsProvider Credentials for authentication with Kinesis. - * @param kinesisClientConfig Configuration for the Kinesis client. + * @param kinesisClientConfig Configuration for the Kinesis client. */ KinesisHelper(final String streamName, - final AWSCredentialsProvider kinesisCredsProvider, - final ClientConfiguration kinesisClientConfig, - final Region region) { + final AWSCredentialsProvider kinesisCredsProvider, + final ClientConfiguration kinesisClientConfig, + final Region region) { this.streamName = streamName; this.serializedKinesisCredsProvider = SerializationHelper.kryoSerializeObject(kinesisCredsProvider); this.serializedkinesisClientConfig = SerializationHelper.kryoSerializeObject(kinesisClientConfig); @@ -107,21 +105,21 @@ public ImmutableSortedMap getShardList() { private DescribeStreamResult getDescribeStreamResult(final DescribeStreamRequest request) { return new InfiniteConstantBackoffRetry(BACKOFF_MILLIS, AmazonClientException.class, new Callable() { - public DescribeStreamResult call() throws Exception { - DescribeStreamResult result = getSharedkinesisClient().describeStream(request); - return result; - } - }).call(); + public DescribeStreamResult call() throws Exception { + DescribeStreamResult result = getSharedkinesisClient().describeStream(request); + return result; + } + }).call(); } /** * @return new instance of AmazonKinesisClient, with parameters supplied by whatever was passed - * to the KinesisHelper constructor. + * to the KinesisHelper constructor. */ private AmazonKinesisClient makeNewKinesisClient() { AmazonKinesisClient client = new AmazonKinesisClient(getKinesisCredsProvider(), getClientConfiguration()); LOG.info("Using " + getRegion().getName() + " region"); - client.setRegion(getRegion()); + client.setRegion(getRegion()); return client; } @@ -165,7 +163,8 @@ private String addTruncatedShardList(final Map spoutShards, f for (Shard s : streamShards) { currShard = s.getShardId(); - spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId())); + final boolean open = s.getSequenceNumberRange().getEndingSequenceNumber() == null; + spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId(), open)); if (s.getParentShardId() != null && s.getAdjacentParentShardId() != null) { // It's a merge. Set both parents of the merge to merge into this shard. diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java index 48f2988..e892cf9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetter.java @@ -15,30 +15,19 @@ package com.amazonaws.services.kinesis.stormspout; -import java.util.concurrent.Callable; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesisClient; -import com.amazonaws.services.kinesis.model.ExpiredIteratorException; -import com.amazonaws.services.kinesis.model.GetRecordsRequest; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; -import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.InvalidArgumentException; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.*; import com.amazonaws.services.kinesis.stormspout.exceptions.InvalidSeekPositionException; import com.amazonaws.services.kinesis.stormspout.exceptions.KinesisSpoutException; import com.amazonaws.services.kinesis.stormspout.utils.InfiniteConstantBackoffRetry; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; /** * Fetches data from a Kinesis shard. @@ -49,20 +38,20 @@ class KinesisShardGetter implements IShardGetter { private static final long BACKOFF_MILLIS = 500L; private final String streamName; - private final String shardId; + private final ShardInfo shard; private final AmazonKinesisClient kinesisClient; private String shardIterator; private ShardPosition positionInShard; /** - * @param streamName Name of the Kinesis stream - * @param shardId Fetch data from this shard + * @param streamName Name of the Kinesis stream + * @param shard Fetch data from this shard * @param kinesisClient Kinesis client to use when making requests. */ - KinesisShardGetter(final String streamName, final String shardId, final AmazonKinesisClient kinesisClient) { + KinesisShardGetter(final String streamName, final ShardInfo shard, final AmazonKinesisClient kinesisClient) { this.streamName = streamName; - this.shardId = shardId; + this.shard = shard; this.kinesisClient = kinesisClient; this.shardIterator = ""; this.positionInShard = ShardPosition.end(); @@ -70,14 +59,14 @@ class KinesisShardGetter implements IShardGetter { @Override public Records getNext(int maxNumberOfRecords) - throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { + throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { if (shardIterator == null) { - LOG.debug(this + " Null shardIterator for " + shardId + ". This can happen if shard is closed."); + LOG.debug(this + " Null shardIterator for " + shard.getShardId() + ". This can happen if shard is closed."); return Records.empty(true); } - final ImmutableList.Builder records = new ImmutableList.Builder<>(); - + final ImmutableList.Builder records = new ImmutableList.Builder(); + try { final GetRecordsRequest request = new GetRecordsRequest(); request.setShardIterator(shardIterator); @@ -93,20 +82,22 @@ public Records getNext(int maxNumberOfRecords) + maxNumberOfRecords + ")."); } - shardIterator = result.getNextShardIterator(); + shardIterator = result.getNextShardIterator(); } catch (AmazonClientException e) { // We'll treat this equivalent to fetching 0 records - the spout drives the retry as part of nextTuple() // We don't sleep here - we can continue processing ack/fail on the spout thread. - LOG.error(this + "Caught exception when fetching records for " + shardId, e); + LOG.error(this + "Caught exception when fetching records for " + shard.getShardId(), e); } - return new Records(records.build(), shardIterator == null); + final boolean endOfShard = shardIterator == null; + final boolean reshard = endOfShard && shard.getShardOpen(); + return new Records(records.build(), endOfShard, reshard); } @Override public void seek(ShardPosition position) - throws AmazonClientException, ResourceNotFoundException, InvalidSeekPositionException { - LOG.info("Seeking to " + position); + throws AmazonClientException, ResourceNotFoundException, InvalidSeekPositionException { + LOG.info(this + " Seeking to " + position + " for shard " + shard.getShardId()); ShardIteratorType iteratorType; String seqNum = null; @@ -145,20 +136,20 @@ public void seek(ShardPosition position) @Override public String getAssociatedShard() { - return shardId; + return shard.getShardId(); } @Override public String toString() { - return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shardId).toString(); + return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shard.getShardId()).toString(); } private String seek(final ShardIteratorType iteratorType, final String seqNum) - throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { + throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { final GetShardIteratorRequest request = new GetShardIteratorRequest(); request.setStreamName(streamName); - request.setShardId(shardId); + request.setShardId(shard.getShardId()); request.setShardIteratorType(iteratorType); // SeqNum is only set on {AT, AFTER}_SEQUENCE_NUMBER, so this is safe. @@ -178,7 +169,7 @@ public String call() throws Exception { } private GetRecordsResult safeGetRecords(final GetRecordsRequest request) - throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { + throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException { while (true) { try { return kinesisClient.getRecords(request); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java index c289644..25e6da1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisShardGetterBuilder.java @@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.stormspout; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSortedMap; /** * Builds KinesisShardGetters. @@ -31,14 +32,14 @@ class KinesisShardGetterBuilder implements IShardGetterBuilder { /** * Constructor. - * + * * @param streamName Kinesis stream to create the getters in. - * @param helper Used to get the AmazonKinesisClient object (used by the getters). + * @param helper Used to get the AmazonKinesisClient object (used by the getters). */ public KinesisShardGetterBuilder(final String streamName, - final KinesisHelper helper, - final int maxRecordsPerCall, - final long emptyRecordListBackoffMillis) { + final KinesisHelper helper, + final int maxRecordsPerCall, + final long emptyRecordListBackoffMillis) { this.streamName = streamName; this.helper = helper; this.maxRecordsPerCall = maxRecordsPerCall; @@ -47,12 +48,15 @@ public KinesisShardGetterBuilder(final String streamName, @Override public ImmutableList buildGetters(ImmutableList shardAssignment) { - ImmutableList.Builder builder = new ImmutableList.Builder<>(); + ImmutableList.Builder builder = new ImmutableList.Builder(); + ImmutableSortedMap shards = helper.getShardList(); - for (String shard : shardAssignment) { - builder.add(new BufferedGetter(new KinesisShardGetter(streamName, shard, helper.getSharedkinesisClient()), - maxRecordsPerCall, - emptyRecordListBackoffMillis)); + for (String shardId : shardAssignment) { + KinesisShardGetter getter = new KinesisShardGetter( + streamName, + shards.get(shardId), + helper.getSharedkinesisClient()); + builder.add(new BufferedGetter(getter, maxRecordsPerCall, emptyRecordListBackoffMillis)); } return builder.build(); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java index 7130026..a56f1aa 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/KinesisSpout.java @@ -15,68 +15,66 @@ package com.amazonaws.services.kinesis.stormspout; -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; - import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager; import com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager; import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; /** * Storm spout for Amazon Kinesis. The spout fetches data from Kinesis and emits a tuple for each data record. - * + *

* Note: every spout task handles a distinct set of shards. */ public class KinesisSpout implements IRichSpout, Serializable { private static final long serialVersionUID = 7707829996758189836L; private static final Logger LOG = LoggerFactory.getLogger(KinesisSpout.class); - - private final InitialPositionInStream initialPosition; + public static final String TOPOLOGY_NAME_MDC_KEY = "component"; // Initialized before open + private final InitialPositionInStream initialPosition; + private final KinesisHelper shardListGetter; private final KinesisSpoutConfig config; - private final IShardListGetter shardListGetter; + private final long emptyRecordListSleepTimeMillis = 5L; private final IShardGetterBuilder getterBuilder; - private long emptyRecordListSleepTimeMillis = 5L; // Initialized on open private transient SpoutOutputCollector collector; private transient TopologyContext context; - private transient IKinesisSpoutStateManager stateManager; + private transient ZookeeperStateManager stateManager; private transient long lastCommitTime; /** * Constructs an instance of the spout with just enough data to bootstrap the state from. * Construction done here is common to all spout tasks, whereas the IKinesisSpoutStateManager created * in activate() is task specific. - * - * @param config Spout configuration. + * + * @param config Spout configuration. * @param credentialsProvider Used when making requests to Kinesis. * @param clientConfiguration Client configuration used when making calls to Kinesis. */ public KinesisSpout(KinesisSpoutConfig config, - AWSCredentialsProvider credentialsProvider, - ClientConfiguration clientConfiguration) { + AWSCredentialsProvider credentialsProvider, + ClientConfiguration clientConfiguration) { this.config = config; KinesisHelper helper = new KinesisHelper(config.getStreamName(), - credentialsProvider, - clientConfiguration, - config.getRegion()); + credentialsProvider, + clientConfiguration, + config.getRegion()); this.shardListGetter = helper; this.getterBuilder = new KinesisShardGetterBuilder(config.getStreamName(), @@ -86,35 +84,23 @@ public KinesisSpout(KinesisSpoutConfig config, this.initialPosition = config.getInitialPositionInStream(); } - /** - * @param config Spout configuration. - * @param shardListGetter Used to list the shards in the stream. - * @param getterBuilder Used for creating shard getters for a task. - */ - KinesisSpout(final KinesisSpoutConfig config, - final IShardListGetter shardListGetter, - final IShardGetterBuilder getterBuilder) { - this.config = config; - this.shardListGetter = shardListGetter; - this.getterBuilder = getterBuilder; - this.initialPosition = config.getInitialPositionInStream(); - } - @Override public void open(@SuppressWarnings("rawtypes") final Map conf, - final TopologyContext spoutContext, - final SpoutOutputCollector spoutCollector) { + final TopologyContext spoutContext, + final SpoutOutputCollector spoutCollector) { config.setTopologyName((String) conf.get(Config.TOPOLOGY_NAME)); this.context = spoutContext; this.collector = spoutCollector; this.stateManager = new ZookeeperStateManager(config, shardListGetter, getterBuilder, initialPosition); + MDC.put(TOPOLOGY_NAME_MDC_KEY, (String) conf.get(Config.TOPOLOGY_NAME)); LOG.info(this + " open() called with topoConfig task index " + spoutContext.getThisTaskIndex() + " for processing stream " + config.getStreamName()); } @Override public void close() { + MDC.remove(TOPOLOGY_NAME_MDC_KEY); } @Override @@ -162,7 +148,7 @@ public void nextTuple() { String currentShardId = getter.getAssociatedShard(); Record rec = null; boolean isRetry = false; - + if (stateManager.shouldRetry(currentShardId)) { rec = stateManager.recordToRetry(currentShardId); if (LOG.isDebugEnabled()) { @@ -171,9 +157,14 @@ public void nextTuple() { } isRetry = true; } else { - final ImmutableList records = getter.getNext(1).getRecords(); - if ((records != null) && (!records.isEmpty())) { - rec = records.get(0); + final Records records = getter.getNext(1); + final ImmutableList recordList = records.getRecords(); + if ((recordList != null) && (!recordList.isEmpty())) { + rec = recordList.get(0); + } + if (records.isReshard()) { + LOG.info(this + " detected reshard event for shard " + currentShardId); + stateManager.handleReshard(); } } @@ -181,11 +172,8 @@ public void nextTuple() { // Copy record (ByteBuffer.duplicate()) so bolts in the same JVM don't affect the object (e.g. retries) Record recordToEmit = copyRecord(rec); List tuple = config.getScheme().deserialize(recordToEmit); - if (LOG.isDebugEnabled()) { - LOG.debug(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard " - + currentShardId + "."); - } - + LOG.info(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard " + + currentShardId + " with data: " + tuple); collector.emit(tuple, MessageIdUtil.constructMessageId(currentShardId, recordToEmit.getSequenceNumber())); stateManager.emit(currentShardId, recordToEmit, isRetry); } else { @@ -212,7 +200,7 @@ public void nextTuple() { /** * Creates a copy of the record so we don't get interference from bolts that execute in the same JVM. * We invoke ByteBuffer.duplicate() so the ByteBuffer state is decoupled. - * + * * @param record Kinesis record * @return Copied record. */ @@ -221,6 +209,7 @@ private Record copyRecord(Record record) { duplicate.setPartitionKey(record.getPartitionKey()); duplicate.setSequenceNumber(record.getSequenceNumber()); duplicate.setData(record.getData().duplicate()); + duplicate.setApproximateArrivalTimestamp(record.getApproximateArrivalTimestamp()); return duplicate; } diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java index 331389e..0250fa6 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/Records.java @@ -25,16 +25,18 @@ class Records { private final ImmutableList records; private final boolean endOfShard; + private final boolean reshard; /** * Constructor. - * - * @param records Kinesis records + * + * @param records Kinesis records * @param endOfShard Did we reach the end of the shard? */ - Records(final ImmutableList records, final boolean endOfShard) { + Records(final ImmutableList records, final boolean endOfShard, final boolean reshard) { this.records = records; this.endOfShard = endOfShard; + this.reshard = reshard; } /** @@ -49,7 +51,7 @@ static Records empty() { * @return a new empty set of records for an open or closed shard. */ static Records empty(final boolean closed) { - return new Records(ImmutableList. of(), closed); + return new Records(ImmutableList.of(), closed, false); } /** @@ -66,9 +68,16 @@ boolean isEndOfShard() { return endOfShard; } + /** + * @return true if a reshard event was detected. + */ + boolean isReshard() { + return reshard; + } + /** * Does the Records instance contain records? - * + * * @return true if getRecords() has records. */ boolean isEmpty() { diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/SerializationHelper.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/SerializationHelper.java index 06a4931..345e6d8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/SerializationHelper.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/SerializationHelper.java @@ -15,17 +15,16 @@ package com.amazonaws.services.kinesis.stormspout; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.nio.ByteBuffer; - +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; import org.objenesis.strategy.StdInstantiatorStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; /** * Utility class for serialization and deserialization. @@ -34,12 +33,14 @@ class SerializationHelper { private static final Logger LOG = LoggerFactory.getLogger(SerializationHelper.class); // Utility class should not be instantiated. - private SerializationHelper() { } + private SerializationHelper() { + } /** * If the ByteBuffer is backed by an array, return this array. Otherwise, dump the ByteBuffer * to a byte array (the latter is an expensive operation). - * @param buf buffer to read from. + * + * @param buf buffer to read from. * @return data or copy of data in buf as a byte array. */ public static byte[] copyData(ByteBuffer buf) { @@ -59,7 +60,8 @@ public static byte[] kryoSerializeObject(final Object obj) { final ByteArrayOutputStream os = new ByteArrayOutputStream(); final Output output = new Output(os); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + // Better initialization to keep a fallback and prevent crash + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); kryo.writeClassAndObject(output, obj); output.flush(); @@ -70,7 +72,9 @@ public static Object kryoDeserializeObject(final byte[] ser) { final Kryo kryo = new Kryo(); final Input input = new Input(new ByteArrayInputStream(ser)); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + // Better initialization to keep a fallback and prevent crash + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); + return kryo.readClassAndObject(input); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java index c519485..6fb36cf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/ShardInfo.java @@ -15,27 +15,29 @@ package com.amazonaws.services.kinesis.stormspout; +import org.apache.commons.lang3.builder.ReflectionToStringBuilder; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.commons.lang3.builder.ReflectionToStringBuilder; - /** * In this class we track the children for a shard. */ class ShardInfo { private final String shardId; + private final boolean open; private String mergesInto; private List splitsInto; /** * Creates a new ShardInfo representing a shard that does not split or merge. - * + * * @param shardId the Kinesis shard ID. */ - ShardInfo(String shardId) { + ShardInfo(String shardId, boolean open) { this.shardId = shardId; + this.open = open; this.mergesInto = ""; this.splitsInto = new ArrayList<>(2); } @@ -47,10 +49,17 @@ String getShardId() { return shardId; } + /** + * @return true iff the shard state is OPEN. + */ + boolean getShardOpen() { + return open; + } + /** * Define what the shard merges into. This is meant to be called at most once (not idempotent). * Cannot be called if addSplitsInto has been called. - * + * * @param mergeShardId set the shard ID that getShardId() merges into. */ void setMergesInto(String mergeShardId) { @@ -62,7 +71,7 @@ void setMergesInto(String mergeShardId) { /** * Define what the shard splits into. Can be called an arbitrary amount of times. Cannot be * called if setMergesInto has been called. - * + * * @param splitShardId add a shard ID to the list of shards getShardId() splits into. */ void addSplitsInto(String splitShardId) { @@ -72,7 +81,7 @@ void addSplitsInto(String splitShardId) { /** * @return the shard ID of the shard that getShardId() merges into. Empty string if the shard - * does not merge into another shard. + * does not merge into another shard. */ String getMergesInto() { return mergesInto; @@ -80,7 +89,7 @@ String getMergesInto() { /** * @return immutable view of the shards resulting from the split. Empty list if the shard does - * not split. + * not split. */ List getSplitsInto() { return Collections.unmodifiableList(splitsInto); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/serializer/RecordSerializer.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/serializer/RecordSerializer.java new file mode 100644 index 0000000..2c458e3 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/serializer/RecordSerializer.java @@ -0,0 +1,45 @@ +package com.amazonaws.services.kinesis.stormspout.serializer; + +import com.amazonaws.services.kinesis.model.Record; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import java.nio.ByteBuffer; + +/** + * author: matt + */ +public class RecordSerializer extends Serializer { + + @Override + public void write (Kryo kryo, Output output, Record record) { + ByteBuffer bb = record.getData(); + if (!bb.hasArray()) { + throw new KryoException("Unable to read ByteBuffer of data from Record"); + } + output.writeInt(bb.array().length); + output.writeBytes(bb.array()); + output.writeString(record.getSequenceNumber()); + output.writeString(record.getPartitionKey()); + kryo.writeClassAndObject(output, null); + } + + @Override + public Record read (Kryo kryo, Input input, Class type) { + Record record = new Record(); + int size = input.readInt(); + byte[] bytes = new byte[size]; + int count = 0; + while (count < size) { + bytes[count] = input.readByte(); + count++; + } + record.withData(ByteBuffer.wrap(bytes)); + record.withSequenceNumber(input.readString()); + record.withPartitionKey(input.readString()); + return record; + } +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java index 17864bd..315bdd4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/InflightRecordTracker.java @@ -15,24 +15,24 @@ package com.amazonaws.services.kinesis.stormspout.state.zookeeper; +import com.amazonaws.services.kinesis.model.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigInteger; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; import java.util.Queue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.amazonaws.services.kinesis.model.Record; - // @formatter:off + /** * This class helps track in-flight records/tuples so we can provide record level ack/fail semantics, * while fetching records from Kinesis in batches. This class computes and tracks the checkpoint sequence number * based on messages that have been acked, failed, or are inflight. * We will checkpoint at the highest sequence number for a record which has been acked or retried up to the retry limit. - * + *

* We temporarily buffer records which have not been acked or exhausted retries, so we can re-emit them upon failure * without having to refetch them from Kinesis. * Since acks/fails can arrive in an order different from the order in which the records were @@ -41,13 +41,13 @@ * acks in the order [125, 101, 132]. When we get the ack for 125, we keep keep it around. When we get an ack for 101, * we remove both 101 and 125 and set checkpointSequenceNumber to 125. When we get the ack for 132, we remove it and * set the checkpointSequenceNumber to 132. - * + *

* Here's the table with info on processing acks. We maintain an ordered list of records (asc seq num). * In the table below we describe how we process an ack for a ("current") record based on the status of the previous * and next records in the ordered list (none means there is no previous/next record, keep means we don't delete * the current record). - * - * + *

+ *

* +---------+-------------------------------+-------------------------------+-------------------------------+ * |Next---> | | | | * |Previous | acked | pending | none | @@ -61,7 +61,6 @@ * | none | delete, delete next, and set | delete and advance checkpoint | delete and advance checkpoint | * | | checkpoint to seq num of next | to seq num of current record | to seq num of current record | * +---------+-------------------------------+-------------------------------+-------------------------------+ - * */ // @formatter:on class InflightRecordTracker { @@ -123,7 +122,7 @@ void onEmit(final Record record, boolean isRetry) { } if (LOG.isDebugEnabled()) { LOG.debug("Shard " + shardId + ": Recorded emit for seq num " + sequenceNumber + ", isRetry = " + isRetry - + ", retryNum = " + retryNum); + + ", retryNum = " + retryNum); } } else { if (LOG.isDebugEnabled()) { @@ -216,9 +215,23 @@ Record recordToRetry() { return recordToRetry; } + Record getInflightRecord(final String sequenceNumber) { + RecordNode node = seqNumToRecordInfoMap.get(sequenceNumber); + LOG.info(this + " getInflightRecord: node = " + node + ", sequenceNumber = " + sequenceNumber + ", shardId = " + + shardId + ", checkpointSequenceNumber = " + checkpointSequenceNumber); + Record record = node.getRecord(); + return record; + } + + Record getEarliestInflightRecord() { + if (recordNodeList.size() > 0) + return recordNodeList.getFirst().getRecord(); + return null; + } + /** * Note: This has package level access solely for testing purposes. - * + * * @return List of in-flight records */ RecordNodeList getRecordNodeList() { @@ -227,7 +240,7 @@ RecordNodeList getRecordNodeList() { /** * Note: This method has package level access solely for testing purposes. - * + * * @return SequenceNumber->RecordNode map of in-flight records. */ Map getSequenceNumberToRecordNodeMap() { @@ -236,7 +249,7 @@ Map getSequenceNumberToRecordNodeMap() { /** * Note: This method has package level access solely for testing purposes. - * + * * @return Queue of records to retry */ Queue getRetryQueue() { @@ -330,11 +343,11 @@ class RecordNodeList { /** * Adds record to the end of the list. - * + * * @param record Record will be added to end of the list * @return Newly added node * @throws IllegalArgumentException Thrown if the sequence number of the record is lower than the (current) last - * node of the list + * node of the list */ RecordNode addToList(Record record) { RecordNode node = new RecordNode(record); diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java index ef1c460..98278a9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/LocalShardState.java @@ -15,14 +15,13 @@ package com.amazonaws.services.kinesis.stormspout.state.zookeeper; +import com.amazonaws.services.kinesis.model.Record; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.kinesis.model.Record; - /** * This class tracks the state of a shard (e.g. current shard position). */ @@ -36,10 +35,10 @@ class LocalShardState { /** * Constructor. - * - * @param shardId ID of the shard this LocalShardState is tracking. + * + * @param shardId ID of the shard this LocalShardState is tracking. * @param latestZookeeperSeqNum the last checkpoint stored in Zookeeper. - * @param recordRetryLimit Number of times a failed record should be retried. + * @param recordRetryLimit Number of times a failed record should be retried. */ LocalShardState(final String shardId, final String latestZookeeperSeqNum, final int recordRetryLimit) { this.shardId = shardId; @@ -50,7 +49,7 @@ class LocalShardState { /** * Call when a record is emitted in nextTuple. * - * @param record the Kinesis record emitted. + * @param record the Kinesis record emitted. * @param isRetry Is this a retry attempt of a previously emitted record. */ void emit(final Record record, boolean isRetry) { @@ -61,17 +60,17 @@ void emit(final Record record, boolean isRetry) { * Call when a record is acknowledged. This will try to update the latest offset to be * stored in Zookeeper, if possible. * - * @param seqNum the sequence number of the record. + * @param seqNum the sequence number of the record. */ void ack(final String seqNum) { tracker.onAck(seqNum); } - /** + /** * Call when a record is failed. It is then added to a retry queue that is queried by * nextTuple(). * - * @param failedSequenceNumber sequence number of failed record. + * @param failedSequenceNumber sequence number of failed record. */ void fail(final String failedSequenceNumber) { tracker.onFail(failedSequenceNumber); @@ -79,8 +78,9 @@ void fail(final String failedSequenceNumber) { /** * Get a record to retry. - * + *

* Pre : shouldRetry(). + * * @return a record to retry - may be null if we can't find a record to retry. */ Record recordToRetry() { @@ -88,6 +88,14 @@ Record recordToRetry() { return tracker.recordToRetry(); } + Record getInflightRecord(final String seqNum) { + return tracker.getInflightRecord(seqNum); + } + + Record getEarliestInflightRecord() { + return tracker.getEarliestInflightRecord(); + } + /** * @return true if there are sequence numbers that need to be retried. */ @@ -114,6 +122,7 @@ boolean isDirty() { /** * Record the sequenced number we checkpointed. + * * @param checkpointSequenceNumber Sequence number we used to checkpoint. */ void commit(String checkpointSequenceNumber) { @@ -124,7 +133,7 @@ void commit(String checkpointSequenceNumber) { * Helper log function. Checks that debug logging is enabled before evaluating the * detailedToString() function. * - * @param prefix prefix to prepend to log message. + * @param prefix prefix to prepend to log message. */ void logMe(String prefix) { if (LOG.isDebugEnabled()) { @@ -135,8 +144,8 @@ void logMe(String prefix) { @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("shardId", shardId) - .toString(); + .append("shardId", shardId) + .toString(); } private String detailedToString() { diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java index 1b4a72f..a22e175 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperShardState.java @@ -15,10 +15,16 @@ package com.amazonaws.services.kinesis.stormspout.state.zookeeper; -import java.io.IOException; -import java.util.Random; -import java.util.concurrent.Callable; - +import com.amazonaws.services.kinesis.stormspout.KinesisSpoutConfig; +import com.amazonaws.services.kinesis.stormspout.exceptions.KinesisSpoutException; +import com.amazonaws.services.kinesis.stormspout.state.zookeeper.NodeFunction.Mod; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.apache.curator.RetryLoop; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -26,16 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.kinesis.stormspout.KinesisSpoutConfig; -import com.amazonaws.services.kinesis.stormspout.exceptions.KinesisSpoutException; -import com.amazonaws.services.kinesis.stormspout.state.zookeeper.NodeFunction.Mod; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; -import com.netflix.curator.RetryLoop; -import com.netflix.curator.framework.CuratorFramework; -import com.netflix.curator.framework.CuratorFrameworkFactory; -import com.netflix.curator.retry.ExponentialBackoffRetry; +import java.util.Random; +import java.util.concurrent.Callable; /** * Handles communication with Zookeeper and methods specific to the spout for saving/restoring @@ -55,7 +53,8 @@ class ZookeeperShardState { /** * Create and configure the ZK sync object with the KinesisSpoutConfig. - * @param config the configuration for the spout. + * + * @param config the configuration for the spout. */ ZookeeperShardState(final KinesisSpoutConfig config) { this.config = config; @@ -64,18 +63,19 @@ class ZookeeperShardState { try { zk = CuratorFrameworkFactory.newClient(config.getZookeeperConnectionString(), new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_NUM_RETRIES)); - } catch (IOException e) { + zk.start(); + + } catch (Exception e) { LOG.error("Could not connect to ZooKeeper", e); throw new KinesisSpoutException(e); } - zk.start(); } /** * Initialize the shardList in ZK. This is called by every spout task on activate(), and ensures * that the shardList is up to date and correct. * - * @param shards list of shards (output of DescribeStream). + * @param shards list of shards (output of DescribeStream). * @throws Exception */ void initialize(final ImmutableList shards) throws Exception { @@ -96,9 +96,17 @@ public byte[] initialize() { @Override public Mod apply(byte[] x) { - // At this point, we don't support resharding. We assume the shard list is valid if one exists. - LOG.info("ShardList already initialized in Zookeeper. Assuming it is valid."); - return Mod.noModification(); + // Implies that every spout will attempt to update the ZK shardList when a reshard occurs. + LOG.info(this + " Re-initialization of shardList: " + shards); + ShardListV0 shardList = new ShardListV0(shards); + ObjectMapper objectMapper = new ObjectMapper(); + byte[] data; + try { + data = objectMapper.writeValueAsBytes(shardList); + } catch (JsonProcessingException e) { + throw new KinesisSpoutException("Unable to serialize shardList " + shardList, e); + } + return Mod.modification(data); } }; @@ -118,8 +126,8 @@ void clearShardList() throws Exception { /** * Commit the checkpoint sequence number for a shard to Zookeeper. * - * @param shardId shard to commit to. - * @param seqNum sequence number to commit. + * @param shardId shard to commit to. + * @param seqNum sequence number to commit. * @throws Exception */ void commitSeqNum(final String shardId, final String seqNum) throws Exception { @@ -132,7 +140,7 @@ void commitSeqNum(final String shardId, final String seqNum) throws Exception { /** * Get the last committed sequence number for the shard from Zookeeper. * - * @param shardId shard to read from. + * @param shardId shard to read from. * @return a sequence number if the state exists, empty string otherwise. * @throws Exception */ @@ -162,7 +170,7 @@ ImmutableList getShardList() throws Exception { /** * Set a watcher for the shardList. - * + * * @param callback Zookeeper watcher to be set on the shard list. * @throws Exception */ @@ -180,7 +188,7 @@ boolean isShardList(final String path) { /** * Closes the connection to ZK. - * + * * @throws InterruptedException */ void close() throws InterruptedException { @@ -190,9 +198,9 @@ void close() throws InterruptedException { /** * Optimistic concurrency scheme for tryAtomicUpdate. Try to update, and keep trying * until successful. - * + * * @param pathSuffix suffix to use to build path in ZooKeeper. - * @param f function used to initialize the node, or transform the data already there. + * @param f function used to initialize the node, or transform the data already there. * @throws Exception */ private void atomicUpdate(final String pathSuffix, final NodeFunction f) throws Exception { @@ -207,7 +215,7 @@ public Boolean call() throws Exception { }); Thread.sleep(BASE_OPTIMISTIC_RETRY_TIME_MS + rand.nextInt(BASE_OPTIMISTIC_RETRY_TIME_MS)); - } while(!done); + } while (!done); } private byte[] get(final String pathSuffix) throws Exception { @@ -247,11 +255,11 @@ public Void call() throws Exception { /** * Try to atomically update a node in ZooKeeper, creating it if it doesn't exist. This is * meant to be used within an optimistic concurrency model. - * + * * @param pathSuffix suffix to use to build path in ZooKeeper. - * @param f function used to initialize the node, or transform the data already there. + * @param f function used to initialize the node, or transform the data already there. * @return true if node was created/updated, false if a concurrent modification occurred - * and succeeded while trying to update/create the node. + * and succeeded while trying to update/create the node. * @throws Exception */ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) throws Exception { @@ -260,8 +268,8 @@ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) t if (stat == null) { try { - zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) - .forPath(path, f.initialize()); + LOG.info("creating ZK data at path " + path); + zk.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, f.initialize()); } catch (KeeperException.NodeExistsException e) { LOG.debug("Concurrent creation of " + path + ", retrying", e); return false; @@ -271,6 +279,7 @@ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) t if (newVal.hasModification()) { try { + LOG.info("updating ZK data at path " + path); zk.setData().withVersion(stat.getVersion()).forPath(path, newVal.get()); } catch (KeeperException.BadVersionException e) { LOG.debug("Concurrent update to " + path + ", retrying.", e); @@ -284,6 +293,6 @@ private boolean tryAtomicUpdate(final String pathSuffix, final NodeFunction f) t private String buildZookeeperPath(final String suffix) { return "/" + config.getZookeeperPrefix() + "/" + config.getTopologyName() + "/" - + config.getStreamName() + "/" + suffix; + + config.getStreamName() + "/" + suffix; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java index 09ee788..aa86a20 100644 --- a/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/stormspout/state/zookeeper/ZookeeperStateManager.java @@ -15,14 +15,13 @@ package com.amazonaws.services.kinesis.stormspout.state.zookeeper; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; - +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.stormspout.*; +import com.amazonaws.services.kinesis.stormspout.exceptions.InvalidSeekPositionException; +import com.amazonaws.services.kinesis.stormspout.exceptions.KinesisSpoutException; +import com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.zookeeper.WatchedEvent; @@ -31,18 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.kinesis.model.Record; -import com.amazonaws.services.kinesis.stormspout.IShardGetter; -import com.amazonaws.services.kinesis.stormspout.IShardGetterBuilder; -import com.amazonaws.services.kinesis.stormspout.IShardListGetter; -import com.amazonaws.services.kinesis.stormspout.InitialPositionInStream; -import com.amazonaws.services.kinesis.stormspout.KinesisSpoutConfig; -import com.amazonaws.services.kinesis.stormspout.ShardPosition; -import com.amazonaws.services.kinesis.stormspout.exceptions.InvalidSeekPositionException; -import com.amazonaws.services.kinesis.stormspout.exceptions.KinesisSpoutException; -import com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; /** * Zookeeper backed IKinesisSpoutStateManager. @@ -50,24 +44,26 @@ public class ZookeeperStateManager implements Watcher, IKinesisSpoutStateManager { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperStateManager.class); + // initialized at construction private final KinesisSpoutConfig config; + private final ShardPosition seekToOnOpen; private final IShardListGetter shardListGetter; private final IShardGetterBuilder getterBuilder; - private final ShardPosition seekToOnOpen; + // updated at activate and deactivate private ZookeeperShardState zk; + private boolean active; private int taskIndex; private int totalNumTasks; - private boolean active; - private ImmutableList getters; + // initialized at bootstrap private Iterator currentGetter; private Map shardStates; /** - * @param config Spout configuration with ZK preferences. + * @param config Spout configuration with ZK preferences. * @param shardListGetter Used to fetch the list of shards in the stream. - * @param getterBuilder Used to build getters for shards a task is responsible for. + * @param getterBuilder Used to build getters for shards a task is responsible for. * @param initialPosition Fetch records from this position when there is no pre-existing ZK state. */ public ZookeeperStateManager( @@ -95,29 +91,33 @@ private ShardPosition getShardPosition(InitialPositionInStream initialPosition) return position; } - /* (non-Javadoc) - * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#activate() + /** + * Ensure that the task can safely be activated + * This will take care of making sure the list is sorted too. */ - @Override - public void activate() { - this.zk = new ZookeeperShardState(config); - this.active = true; - - // Ensure that the task can safely be activated - // This will take care of making sure the list is sorted too. + public void initialize() { ImmutableList shardList = ImmutableList.copyOf(shardListGetter.getShardList().keySet()); LOG.info(this + "Activating with shardList " + shardList); try { zk.initialize(shardList); - // Hook shardList watcher for the first time. zk.watchShardList(this); } catch (Exception e) { LOG.error(this + " something went wrong while initializing Zookeeper shardList." - + " Assuming it is unsafe to continue.", e); + + " Assuming it is unsafe to continue.", e); throw new KinesisSpoutException(e); } } + /* (non-Javadoc) + * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#activate() + */ + @Override + public void activate() { + this.zk = new ZookeeperShardState(config); + this.active = true; + initialize(); + } + /* (non-Javadoc) * @see com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager#deactivate() */ @@ -265,6 +265,8 @@ public void commitShardStates() { // They are all synchronized on the instance of this class. @Override public synchronized void process(WatchedEvent event) { + LOG.debug(this + " watched event occurred: " + event); + checkState(active, "Cannot process events if state is not active (a ZK" + " connection is necessary)."); @@ -280,7 +282,7 @@ public synchronized void process(WatchedEvent event) { // Failure is fatal for the task (and it's been retried, so it's indicative of a // bigger Zookeeper/global state issue). LOG.error(this + " failure to re-attach event handler for ZK node " - + event.getPath(), e); + + event.getPath(), e); throw new KinesisSpoutException(e); } @@ -288,55 +290,81 @@ public synchronized void process(WatchedEvent event) { // a reshard, and we need to sync with the state in ZK. if (event.getType() == EventType.NodeDataChanged && zk.isShardList(event.getPath())) { LOG.info(this + " detected change in shardList. Committing current shard state and " - + "reinitializing spout task from ZK."); - + + "reinitializing spout task from ZK."); commitShardStates(); bootstrapStateFromZookeeper(); } } + /** + * Will re-initialize the shard list, causing the watcher to trigger and detect the change in the shard list. + */ + public void handleReshard() { + initialize(); + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) - .append("taskIndex", taskIndex) - .toString(); + .append("taskIndex", taskIndex) + .toString(); } // Recomputes shard assignment for the current task. Then, recreates the local shard state // and the getters from whatever data is in Zookeeper. private void bootstrapStateFromZookeeper() { ImmutableList shardAssignment = getShardAssignment(); + ImmutableList getters; // Task could not get an assignment (e.g. there are too many tasks for too few shards). if (shardAssignment.isEmpty()) { - this.shardStates = new HashMap<>(); - this.getters = ImmutableList.of(); + shardStates = new HashMap(); + getters = ImmutableList.of(); } else { - this.shardStates = makeLocalState(shardAssignment); - this.getters = makeGetters(shardAssignment); + updateLocalState(shardAssignment); + getters = makeGetters(shardAssignment); } this.currentGetter = Iterators.cycle(getters); LOG.info(this + " got getter assignment. Handling " + getters + "."); } - // Create the local shard state from Zookeeper. - private Map makeLocalState(ImmutableList shardAssignment) { - Map state = new HashMap<>(); + // Create/update the local shard state from Zookeeper. + private void updateLocalState(ImmutableList shardAssignment) { + // first initialization of shardStates + if (shardStates == null) { + shardStates = new HashMap(); + } + // remove shard state that we're not longer responsible for + Iterator> iter = shardStates.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + final String shardId = entry.getKey(); + if (!shardAssignment.contains(shardId)) { + LOG.info(this + " removing stale shard state for shard " + shardId); + iter.remove(); + } + } for (final String shardId : shardAssignment) { + // first check for pre-existing shard state... + if (shardStates.containsKey(shardId)) { + final LocalShardState st = shardStates.get(shardId); + LOG.info(this + " keeping existing shard state for shard " + shardId + " with earliest inflight seqnum " + st.getEarliestInflightRecord()); + continue; + } + // otherwise fallback to zk committed shard state String latestValidSeqNum; try { latestValidSeqNum = zk.getLastCommittedSeqNum(shardId); + LOG.info(this + " fell back to zk record with seqnum (" + latestValidSeqNum + ") for shard " + shardId); } catch (Exception e) { - LOG.error(this + " could not retrieve last committed seqnum for " + shardId - + " from ZooKeeper. Starting from default getter position."); latestValidSeqNum = ""; + LOG.error(this + " could not retrieve last committed seqnum for " + shardId + + " from ZooKeeper. Starting from default getter position."); } - state.put(shardId, new LocalShardState(shardId, latestValidSeqNum, config.getRecordRetryLimit())); + shardStates.put(shardId, new LocalShardState(shardId, latestValidSeqNum, config.getRecordRetryLimit())); } - - return state; } // Opens getters based on shard assignment and local shard state, and seeks them to seekToOnOpen. @@ -346,16 +374,29 @@ private ImmutableList makeGetters(ImmutableList shardAssig final ImmutableList myGetters = getterBuilder.buildGetters(shardAssignment); - for (final IShardGetter getter: myGetters) { + for (final IShardGetter getter : myGetters) { final String shardId = getter.getAssociatedShard(); final LocalShardState shardState = safeGetShardState(shardId); try { - if (shardState.getLatestValidSeqNum().isEmpty() && seekToOnOpen != null) { - getter.seek(seekToOnOpen); + if (shardState.getEarliestInflightRecord() != null) { + // seek getter to earliest inflight record + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to early inflight " + shardState.getEarliestInflightRecord().getSequenceNumber()); + } + getter.seek(ShardPosition.afterSequenceNumber(shardState.getEarliestInflightRecord().getSequenceNumber())); } else if (!shardState.getLatestValidSeqNum().isEmpty()) { - getter.seek(ShardPosition.afterSequenceNumber( - shardState.getLatestValidSeqNum())); + // we fell back to zk state, seek shard state to appropriate position + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to committed " + shardState.getLatestValidSeqNum()); + } + getter.seek(ShardPosition.afterSequenceNumber(shardState.getLatestValidSeqNum())); + } else if (seekToOnOpen != null) { + if (LOG.isDebugEnabled()) { + LOG.debug(this + " seeking to default " + seekToOnOpen); + } + // no pre-existing shard state or valid zk state, seek to default + getter.seek(seekToOnOpen); } } catch (InvalidSeekPositionException e) { LOG.error(this + " tried to seek getter " + getter + " to an invalid position.", e); @@ -378,7 +419,7 @@ private ImmutableList getShardAssignment() { LOG.info(this + " Got shardList: " + shardList); } catch (Exception e) { LOG.error(this + " could not compute shard assigment: could not retrieve shard list" - + " from ZK.", e); + + " from ZK.", e); throw new KinesisSpoutException(e); } @@ -393,7 +434,7 @@ private ImmutableList getShardAssignment() { private LocalShardState safeGetShardState(final String shardId) { final LocalShardState st = shardStates.get(shardId); checkNotNull(st, "Shard state map inconsistent with shard assignment (could not get" - + " shardId=" + shardId + ")."); + + " shardId=" + shardId + ")."); return st; } } diff --git a/src/main/samples/SampleBolt.java b/src/main/samples/SampleBolt.java index f42e757..d949ede 100644 --- a/src/main/samples/SampleBolt.java +++ b/src/main/samples/SampleBolt.java @@ -14,23 +14,20 @@ */ +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseBasicBolt; -import backtype.storm.tuple.Tuple; - -import com.amazonaws.services.kinesis.model.Record; - public class SampleBolt extends BaseBasicBolt { private static final long serialVersionUID = 177788290277634253L; private static final Logger LOG = LoggerFactory.getLogger(SampleBolt.class); @@ -40,14 +37,14 @@ public class SampleBolt extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { decoder = Charset.forName("UTF-8").newDecoder(); } - + @Override public void execute(Tuple input, BasicOutputCollector collector) { - String partitionKey = (String)input.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); - String sequenceNumber = (String)input.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); - byte[] payload = (byte[])input.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); + String partitionKey = (String) input.getValueByField(SampleKinesisRecordScheme.FIELD_PARTITION_KEY); + String sequenceNumber = (String) input.getValueByField(SampleKinesisRecordScheme.FIELD_SEQUENCE_NUMBER); + byte[] payload = (byte[]) input.getValueByField(SampleKinesisRecordScheme.FIELD_RECORD_DATA); ByteBuffer buffer = ByteBuffer.wrap(payload); - String data = null; + String data = null; try { data = decoder.decode(buffer).toString(); } catch (CharacterCodingException e) { diff --git a/src/main/samples/SampleKinesisRecordScheme.java b/src/main/samples/SampleKinesisRecordScheme.java index 702783e..89306cd 100644 --- a/src/main/samples/SampleKinesisRecordScheme.java +++ b/src/main/samples/SampleKinesisRecordScheme.java @@ -13,13 +13,12 @@ * permissions and limitations under the License. */ -import java.util.ArrayList; -import java.util.List; - -import backtype.storm.tuple.Fields; - import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.stormspout.IKinesisRecordScheme; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.List; /** * Sample scheme for emitting Kinesis records as tuples. It emits a tuple of (partitionKey, sequenceNumber, and data). @@ -30,12 +29,12 @@ public class SampleKinesisRecordScheme implements IKinesisRecordScheme { * Name of the (partition key) value in the tuple. */ public static final String FIELD_PARTITION_KEY = "partitionKey"; - + /** * Name of the sequence number value in the tuple. */ public static final String FIELD_SEQUENCE_NUMBER = "sequenceNumber"; - + /** * Name of the Kinesis record data value in the tuple. */ diff --git a/src/main/samples/SampleTopology.java b/src/main/samples/SampleTopology.java index b31be26..680ae0c 100644 --- a/src/main/samples/SampleTopology.java +++ b/src/main/samples/SampleTopology.java @@ -14,27 +14,27 @@ */ -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Properties; - -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; - import com.amazonaws.ClientConfiguration; import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesis.stormspout.InitialPositionInStream; import com.amazonaws.services.kinesis.stormspout.KinesisSpout; import com.amazonaws.services.kinesis.stormspout.KinesisSpoutConfig; +import com.amazonaws.services.kinesis.stormspout.serializer.RecordSerializer; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; public class SampleTopology { private static final Logger LOG = LoggerFactory.getLogger(SampleTopology.class); @@ -46,17 +46,17 @@ public class SampleTopology { private static String zookeeperEndpoint; private static String zookeeperPrefix; - public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException { + public static void main(String[] args) throws IllegalArgumentException, KeeperException, InterruptedException, AlreadyAliveException, InvalidTopologyException, IOException, AuthorizationException { String propertiesFile = null; String mode = null; - + if (args.length != 2) { printUsageAndExit(); } else { propertiesFile = args[0]; mode = args[1]; } - + configure(propertiesFile); final KinesisSpoutConfig config = @@ -76,6 +76,7 @@ public static void main(String[] args) throws IllegalArgumentException, KeeperEx Config topoConf = new Config(); topoConf.setFallBackOnJavaSerialization(true); + topoConf.registerSerialization(Record.class, RecordSerializer.class); topoConf.setDebug(false); if (mode.equals("LocalMode")) { @@ -85,13 +86,13 @@ public static void main(String[] args) throws IllegalArgumentException, KeeperEx topoConf.setNumWorkers(1); topoConf.setMaxSpoutPending(5000); LOG.info("Submitting sample topology " + topologyName + " to remote cluster."); - StormSubmitter.submitTopology(topologyName, topoConf, builder.createTopology()); + StormSubmitter.submitTopology(topologyName, topoConf, builder.createTopology()); } else { printUsageAndExit(); } } - + private static void configure(String propertiesFile) throws IOException { FileInputStream inputStream = new FileInputStream(propertiesFile); Properties properties = new Properties(); @@ -115,10 +116,10 @@ private static void configure(String propertiesFile) throws IOException { String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY); if (initialPositionOverride != null) { - initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride); + initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride); } LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found)."); - + String recordRetryLimitOverride = properties.getProperty(ConfigKeys.RECORD_RETRY_LIMIT); if (recordRetryLimitOverride != null) { recordRetryLimit = Integer.parseInt(recordRetryLimitOverride.trim()); @@ -138,13 +139,13 @@ private static void configure(String propertiesFile) throws IOException { LOG.info("Using zookeeper endpoint " + zookeeperEndpoint); String zookeeperPrefixOverride = properties.getProperty(ConfigKeys.ZOOKEEPER_PREFIX_KEY); - if (zookeeperPrefixOverride != null) { + if (zookeeperPrefixOverride != null) { zookeeperPrefix = zookeeperPrefixOverride; } LOG.info("Using zookeeper prefix " + zookeeperPrefix); } - + private static void printUsageAndExit() { System.out.println("Usage: " + SampleTopology.class.getName() + " "); System.exit(-1);