Skip to content
This repository has been archived by the owner on Apr 13, 2018. It is now read-only.

Commit

Permalink
Adds handle reshard support.
Browse files Browse the repository at this point in the history
  • Loading branch information
atroschinetz-rmn committed May 8, 2015
1 parent e3792d5 commit cecd8e8
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 115 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The Amazon Kinesis Storm spout fetches data records from Amazon Kinesis and emit

The Amazon Kinesis Storm spout can be configured to retry failed records. By default, it retries a failed record 3 times. If a record fails and the retry limit has been reached, the spout will log an error and skip over the record. The spout buffers pending records in memory, so it can re-emit a failed record without having to re-fetch the record from Amazon Kinesis. The spout sets the checkpoint to the highest sequence number that has been ack'ed (or exhausted retry attempts).

To use the spout, you'll need to add it to your Storm topology.
To use the spout, you'll need to add it to your Storm topology.

+ **KinesisSpout**: Constructs an instance of the spout, using your AWS credentials and the configuration specified in KinesisSpoutConfig (as well as com.amazonaws.ClientConfiguration, via the AWS SDK). Each task executed by the spout operates on a distinct set of Amazon Kinesis shards. Shard states are periodically committed to ZooKeeper. When the spout is deactivated, it will disconnect from ZooKeeper, but the spout will continue monitoring its local state so you can activate it again later.
+ **KinesisSpoutConfig**: Configures the spout, including the Storm topology name, the Amazon Kinesis stream name, the endpoint for connecting to ZooKeeper, and the prefix for the ZooKeeper paths where the spout state is stored. See the samples folder for configuration examples.
Expand All @@ -28,18 +28,14 @@ The samples folder includes a sample topology and sample bolt, using the number

1. Edit the *.properties file to configure your Storm topology, Amazon Kinesis stream, and ZooKeeper details. For your AWS Credentials, we recommend using IAM roles on Amazon EC2 when possible. You can also specify your credentials using system properties, environment variables, or AwsCredentials.properties.
2. Package the spout and the sample (including all dependencies but excluding Storm itself) into one JAR file.
3. Deploy the package to Storm via the JAR file, e.g., `storm jar my-spout-sample.jar SampleTopology sample.properties RemoteMode`
3. Deploy the package to Storm via the JAR file, e.g., `storm jar my-spout-sample.jar SampleTopology sample.properties RemoteMode`

## Release Notes
### Release 1.1 (October 21, 2014)
+ Added support for retrying failed records
+ Added region name support


### Future Work

+ Handle closed, split, and merged shards

## Related Resources

[Amazon Kinesis Developer Guide](http://docs.aws.amazon.com/kinesis/latest/dev/introduction.html)
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>kinesis-storm-spout</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Storm Spout for Java</name>
<version>1.1.0</version>
<version>1.2.0</version>
<description> The Amazon Kinesis Storm Spout helps Java developers integrate Amazon Kinesis with Storm.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -30,8 +30,8 @@
<guava.version>13.0-final</guava.version>
<commons-lang3.version>3.0</commons-lang3.version>
</properties>


<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Records getNext(int maxNumberOfRecords) {
ensureBuffered();

if (!it.hasNext() && buffer.isEndOfShard()) {
return new Records(ImmutableList.<Record> of(), true);
return new Records(ImmutableList.<Record> of(), true, buffer.isReshard());
}

ImmutableList.Builder<Record> recs = new ImmutableList.Builder<>();
Expand All @@ -94,7 +94,7 @@ public Records getNext(int maxNumberOfRecords) {
}
}

return new Records(recs.build(), false);
return new Records(recs.build(), false, buffer.isReshard());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ private String addTruncatedShardList(final Map<String, ShardInfo> spoutShards, f

for (Shard s : streamShards) {
currShard = s.getShardId();
spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId()));
final boolean open = s.getSequenceNumberRange().getEndingSequenceNumber() == null;
spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId(), open));

if (s.getParentShardId() != null && s.getAdjacentParentShardId() != null) {
// It's a merge. Set both parents of the merge to merge into this shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ class KinesisShardGetter implements IShardGetter {
private static final long BACKOFF_MILLIS = 500L;

private final String streamName;
private final String shardId;
private final ShardInfo shard;
private final AmazonKinesisClient kinesisClient;

private String shardIterator;
private ShardPosition positionInShard;

/**
* @param streamName Name of the Kinesis stream
* @param shardId Fetch data from this shard
* @param shard Fetch data from this shard
* @param kinesisClient Kinesis client to use when making requests.
*/
KinesisShardGetter(final String streamName, final String shardId, final AmazonKinesisClient kinesisClient) {
KinesisShardGetter(final String streamName, final ShardInfo shard, final AmazonKinesisClient kinesisClient) {
this.streamName = streamName;
this.shardId = shardId;
this.shard = shard;
this.kinesisClient = kinesisClient;
this.shardIterator = "";
this.positionInShard = ShardPosition.end();
Expand All @@ -72,12 +72,12 @@ class KinesisShardGetter implements IShardGetter {
public Records getNext(int maxNumberOfRecords)
throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException {
if (shardIterator == null) {
LOG.debug(this + " Null shardIterator for " + shardId + ". This can happen if shard is closed.");
LOG.debug(this + " Null shardIterator for " + shard.getShardId() + ". This can happen if shard is closed.");
return Records.empty(true);
}

final ImmutableList.Builder<Record> records = new ImmutableList.Builder<>();
final ImmutableList.Builder<Record> records = new ImmutableList.Builder<Record>();

try {
final GetRecordsRequest request = new GetRecordsRequest();
request.setShardIterator(shardIterator);
Expand All @@ -93,20 +93,22 @@ public Records getNext(int maxNumberOfRecords)
+ maxNumberOfRecords + ").");
}

shardIterator = result.getNextShardIterator();
shardIterator = result.getNextShardIterator();
} catch (AmazonClientException e) {
// We'll treat this equivalent to fetching 0 records - the spout drives the retry as part of nextTuple()
// We don't sleep here - we can continue processing ack/fail on the spout thread.
LOG.error(this + "Caught exception when fetching records for " + shardId, e);
LOG.error(this + "Caught exception when fetching records for " + shard.getShardId(), e);
}

return new Records(records.build(), shardIterator == null);
final boolean endOfShard = shardIterator == null;
final boolean reshard = endOfShard && shard.getShardOpen();
return new Records(records.build(), endOfShard, reshard);
}

@Override
public void seek(ShardPosition position)
throws AmazonClientException, ResourceNotFoundException, InvalidSeekPositionException {
LOG.info("Seeking to " + position);
LOG.info(this + " Seeking to " + position + " for shard " + shard.getShardId());

ShardIteratorType iteratorType;
String seqNum = null;
Expand Down Expand Up @@ -145,20 +147,20 @@ public void seek(ShardPosition position)

@Override
public String getAssociatedShard() {
return shardId;
return shard.getShardId();
}

@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shardId).toString();
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("shardId", shard.getShardId()).toString();
}

private String seek(final ShardIteratorType iteratorType, final String seqNum)
throws AmazonClientException, ResourceNotFoundException, InvalidArgumentException {
final GetShardIteratorRequest request = new GetShardIteratorRequest();

request.setStreamName(streamName);
request.setShardId(shardId);
request.setShardId(shard.getShardId());
request.setShardIteratorType(iteratorType);

// SeqNum is only set on {AT, AFTER}_SEQUENCE_NUMBER, so this is safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazonaws.services.kinesis.stormspout;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;

/**
* Builds KinesisShardGetters.
Expand All @@ -31,7 +32,7 @@ class KinesisShardGetterBuilder implements IShardGetterBuilder {

/**
* Constructor.
*
*
* @param streamName Kinesis stream to create the getters in.
* @param helper Used to get the AmazonKinesisClient object (used by the getters).
*/
Expand All @@ -47,12 +48,15 @@ public KinesisShardGetterBuilder(final String streamName,

@Override
public ImmutableList<IShardGetter> buildGetters(ImmutableList<String> shardAssignment) {
ImmutableList.Builder<IShardGetter> builder = new ImmutableList.Builder<>();
ImmutableList.Builder<IShardGetter> builder = new ImmutableList.Builder<IShardGetter>();
ImmutableSortedMap<String, ShardInfo> shards = helper.getShardList();

for (String shard : shardAssignment) {
builder.add(new BufferedGetter(new KinesisShardGetter(streamName, shard, helper.getSharedkinesisClient()),
maxRecordsPerCall,
emptyRecordListBackoffMillis));
for (String shardId : shardAssignment) {
KinesisShardGetter getter = new KinesisShardGetter(
streamName,
shards.get(shardId),
helper.getSharedkinesisClient());
builder.add(new BufferedGetter(getter, maxRecordsPerCall, emptyRecordListBackoffMillis));
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,44 @@
import com.amazonaws.services.kinesis.stormspout.state.IKinesisSpoutStateManager;
import com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* Storm spout for Amazon Kinesis. The spout fetches data from Kinesis and emits a tuple for each data record.
*
*
* Note: every spout task handles a distinct set of shards.
*/
public class KinesisSpout implements IRichSpout, Serializable {
private static final long serialVersionUID = 7707829996758189836L;
private static final Logger LOG = LoggerFactory.getLogger(KinesisSpout.class);

private final InitialPositionInStream initialPosition;

// Initialized before open
private final InitialPositionInStream initialPosition;
private final KinesisHelper shardListGetter;
private final KinesisSpoutConfig config;
private final IShardListGetter shardListGetter;
private final long emptyRecordListSleepTimeMillis = 5L;
private final DeadLetterQueue deadLetterQueue;
private final IShardGetterBuilder getterBuilder;
private long emptyRecordListSleepTimeMillis = 5L;

// Initialized on open
private transient SpoutOutputCollector collector;
private transient TopologyContext context;
private transient IKinesisSpoutStateManager stateManager;
private transient ZookeeperStateManager stateManager;
private transient long lastCommitTime;

/**
* Constructs an instance of the spout with just enough data to bootstrap the state from.
* Construction done here is common to all spout tasks, whereas the IKinesisSpoutStateManager created
* in activate() is task specific.
*
*
* @param config Spout configuration.
* @param credentialsProvider Used when making requests to Kinesis.
* @param clientConfiguration Client configuration used when making calls to Kinesis.
Expand All @@ -86,20 +95,6 @@ public KinesisSpout(KinesisSpoutConfig config,
this.initialPosition = config.getInitialPositionInStream();
}

/**
* @param config Spout configuration.
* @param shardListGetter Used to list the shards in the stream.
* @param getterBuilder Used for creating shard getters for a task.
*/
KinesisSpout(final KinesisSpoutConfig config,
final IShardListGetter shardListGetter,
final IShardGetterBuilder getterBuilder) {
this.config = config;
this.shardListGetter = shardListGetter;
this.getterBuilder = getterBuilder;
this.initialPosition = config.getInitialPositionInStream();
}

@Override
public void open(@SuppressWarnings("rawtypes") final Map conf,
final TopologyContext spoutContext,
Expand Down Expand Up @@ -162,7 +157,7 @@ public void nextTuple() {
String currentShardId = getter.getAssociatedShard();
Record rec = null;
boolean isRetry = false;

if (stateManager.shouldRetry(currentShardId)) {
rec = stateManager.recordToRetry(currentShardId);
if (LOG.isDebugEnabled()) {
Expand All @@ -171,21 +166,23 @@ public void nextTuple() {
}
isRetry = true;
} else {
final ImmutableList<Record> records = getter.getNext(1).getRecords();
if ((records != null) && (!records.isEmpty())) {
rec = records.get(0);
final Records records = getter.getNext(1);
final ImmutableList<Record> recordList = records.getRecords();
if ((recordList != null) && (!recordList.isEmpty())) {
rec = recordList.get(0);
}
if (records.isReshard()) {
LOG.info(this + " detected reshard event for shard " + currentShardId);
stateManager.handleReshard();
}
}

if (rec != null) {
// Copy record (ByteBuffer.duplicate()) so bolts in the same JVM don't affect the object (e.g. retries)
Record recordToEmit = copyRecord(rec);
List<Object> tuple = config.getScheme().deserialize(recordToEmit);
if (LOG.isDebugEnabled()) {
LOG.debug(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard "
+ currentShardId + ".");
}

LOG.info(this + " emitting record with seqnum " + recordToEmit.getSequenceNumber() + " from shard "
+ currentShardId + " with data: " + tuple);
collector.emit(tuple, MessageIdUtil.constructMessageId(currentShardId, recordToEmit.getSequenceNumber()));
stateManager.emit(currentShardId, recordToEmit, isRetry);
} else {
Expand All @@ -212,7 +209,7 @@ public void nextTuple() {
/**
* Creates a copy of the record so we don't get interference from bolts that execute in the same JVM.
* We invoke ByteBuffer.duplicate() so the ByteBuffer state is decoupled.
*
*
* @param record Kinesis record
* @return Copied record.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,19 @@
class Records {
private final ImmutableList<Record> records;
private final boolean endOfShard;
private final boolean reshard;

/**
* Constructor.
*
* @param records Kinesis records
* @param endOfShard Did we reach the end of the shard?
* @param reshard Has a reshard (shard split or merge) event occurred?
*/
Records(final ImmutableList<Record> records, final boolean endOfShard) {
Records(final ImmutableList<Record> records, final boolean endOfShard, final boolean reshard) {
this.records = records;
this.endOfShard = endOfShard;
this.reshard = reshard;
}

/**
Expand All @@ -49,7 +52,7 @@ static Records empty() {
* @return a new empty set of records for an open or closed shard.
*/
static Records empty(final boolean closed) {
return new Records(ImmutableList.<Record> of(), closed);
return new Records(ImmutableList.<Record> of(), closed, false);
}

/**
Expand All @@ -66,6 +69,13 @@ boolean isEndOfShard() {
return endOfShard;
}

/**
* @return true if a reshard event was detected.
*/
boolean isReshard() {
return reshard;
}

/**
* Does the Records instance contain records?
*
Expand Down
Loading

0 comments on commit cecd8e8

Please sign in to comment.