Skip to content

Commit

Permalink
\- applied PR amazon-archives#6, \n\- applied PR amazon-archives#12, …
Browse files Browse the repository at this point in the history
…\n\- applied PR amazon-archives#23, \n\- applied PR amazon-archives#32, \n\- Register MDC key for TOPOLOGY_NAME, \n\- added Maven Encoding default to properties, \n- added gitignore rules, \n\- updated all dependencies versions in pom.xml
  • Loading branch information
christiangda committed Nov 2, 2016
1 parent 6318f5c commit 20f70a5
Show file tree
Hide file tree
Showing 20 changed files with 481 additions and 355 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
target/
AwsCredentials.properties
.idea
**.iml
22 changes: 15 additions & 7 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>kinesis-storm-spout</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Storm Spout for Java</name>
<version>1.1.1</version>
<version>1.2.1-SNAPSHOT</version>
<description> The Amazon Kinesis Storm Spout helps Java developers integrate Amazon Kinesis with Storm.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -24,11 +24,14 @@


<properties>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<storm.version>0.9.2-incubating</storm.version>
<curator-framework.version>1.1.3</curator-framework.version>
<guava.version>13.0-final</guava.version>
<commons-lang3.version>3.0</commons-lang3.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<aws-java-sdk.version>1.11.50</aws-java-sdk.version>
<storm.version>1.0.2</storm.version>
<curator-framework.version>3.2.0</curator-framework.version>
<guava.version>20.0</guava.version>
<commons-lang3.version>3.5</commons-lang3.version>
<kryo.version>2.24.0</kryo.version>
</properties>


Expand All @@ -55,10 +58,15 @@
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator-framework.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
</dependencies>

<developers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@

package com.amazonaws.services.kinesis.stormspout;

import java.util.Iterator;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.stormspout.exceptions.InvalidSeekPositionException;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import java.util.Iterator;

/**
* Allows users to do efficient getter.getNext(1) calls in exchange for maybe pulling
Expand All @@ -40,27 +39,27 @@ class BufferedGetter implements IShardGetter {

/**
* Creates a (shard) getter that buffers records.
*
* @param underlyingGetter Unbuffered shard getter.
* @param maxBufferSize Max number of records to fetch from the underlying getter.
*
* @param underlyingGetter Unbuffered shard getter.
* @param maxBufferSize Max number of records to fetch from the underlying getter.
* @param emptyRecordListBackoffMillis Backoff time between GetRecords calls if previous call fetched no records.
*/
public BufferedGetter(final IShardGetter underlyingGetter, final int maxBufferSize, final long emptyRecordListBackoffMillis) {
this(underlyingGetter, maxBufferSize, emptyRecordListBackoffMillis, new TimeProvider());
}

/**
* Used for unit testing.
*
* @param underlyingGetter Unbuffered shard getter
* @param maxBufferSize Max number of records to fetch from the underlying getter
*
* @param underlyingGetter Unbuffered shard getter
* @param maxBufferSize Max number of records to fetch from the underlying getter
* @param emptyRecordListBackoffMillis Backoff time between GetRecords calls if previous call fetched no records.
* @param timeProvider Useful for testing timing based behavior (e.g. backoff)
* @param timeProvider Useful for testing timing based behavior (e.g. backoff)
*/
BufferedGetter(final IShardGetter underlyingGetter,
final int maxBufferSize,
final long emptyRecordListBackoffMillis,
final TimeProvider timeProvider) {
final int maxBufferSize,
final long emptyRecordListBackoffMillis,
final TimeProvider timeProvider) {
this.getter = underlyingGetter;
this.maxBufferSize = maxBufferSize;
this.emptyRecordListBackoffTime = emptyRecordListBackoffMillis;
Expand All @@ -72,7 +71,7 @@ public Records getNext(int maxNumberOfRecords) {
ensureBuffered();

if (!it.hasNext() && buffer.isEndOfShard()) {
return new Records(ImmutableList.<Record> of(), true);
return new Records(ImmutableList.<Record>of(), true, buffer.isReshard());
}

ImmutableList.Builder<Record> recs = new ImmutableList.Builder<>();
Expand All @@ -94,7 +93,7 @@ public Records getNext(int maxNumberOfRecords) {
}
}

return new Records(recs.build(), false);
return new Records(recs.build(), false, buffer.isReshard());
}

@Override
Expand Down Expand Up @@ -132,8 +131,8 @@ private void rebuffer() {
}
}
}
/**

/**
* Time provider - helpful for unit tests of BufferedGetter.
*/
static class TimeProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@

package com.amazonaws.services.kinesis.stormspout;

import com.amazonaws.services.kinesis.model.Record;
import org.apache.storm.tuple.Fields;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;

/**
* Default scheme for emitting Kinesis records as tuples. It emits a tuple of (partitionKey, record).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

package com.amazonaws.services.kinesis.stormspout;

import java.util.List;

import backtype.storm.tuple.Fields;

import com.amazonaws.services.kinesis.model.Record;
import org.apache.storm.tuple.Fields;

import java.util.List;

/**
* Used to convert Kinesis record into a tuple.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,24 @@

package com.amazonaws.services.kinesis.stormspout;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.stormspout.utils.InfiniteConstantBackoffRetry;
import com.amazonaws.services.kinesis.stormspout.utils.ShardIdComparator;
import com.google.common.collect.ImmutableSortedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* Helper class to fetch the shard list from Kinesis, create Kinesis client objects, etc.
Expand All @@ -58,14 +56,14 @@ class KinesisHelper implements IShardListGetter {
private transient Region region;

/**
* @param streamName Kinesis stream name to interact with.
* @param streamName Kinesis stream name to interact with.
* @param kinesisCredsProvider Credentials for authentication with Kinesis.
* @param kinesisClientConfig Configuration for the Kinesis client.
* @param kinesisClientConfig Configuration for the Kinesis client.
*/
KinesisHelper(final String streamName,
final AWSCredentialsProvider kinesisCredsProvider,
final ClientConfiguration kinesisClientConfig,
final Region region) {
final AWSCredentialsProvider kinesisCredsProvider,
final ClientConfiguration kinesisClientConfig,
final Region region) {
this.streamName = streamName;
this.serializedKinesisCredsProvider = SerializationHelper.kryoSerializeObject(kinesisCredsProvider);
this.serializedkinesisClientConfig = SerializationHelper.kryoSerializeObject(kinesisClientConfig);
Expand Down Expand Up @@ -107,21 +105,21 @@ public ImmutableSortedMap<String, ShardInfo> getShardList() {
private DescribeStreamResult getDescribeStreamResult(final DescribeStreamRequest request) {
return new InfiniteConstantBackoffRetry<DescribeStreamResult>(BACKOFF_MILLIS, AmazonClientException.class,
new Callable<DescribeStreamResult>() {
public DescribeStreamResult call() throws Exception {
DescribeStreamResult result = getSharedkinesisClient().describeStream(request);
return result;
}
}).call();
public DescribeStreamResult call() throws Exception {
DescribeStreamResult result = getSharedkinesisClient().describeStream(request);
return result;
}
}).call();
}

/**
* @return new instance of AmazonKinesisClient, with parameters supplied by whatever was passed
* to the KinesisHelper constructor.
* to the KinesisHelper constructor.
*/
private AmazonKinesisClient makeNewKinesisClient() {
AmazonKinesisClient client = new AmazonKinesisClient(getKinesisCredsProvider(), getClientConfiguration());
LOG.info("Using " + getRegion().getName() + " region");
client.setRegion(getRegion());
client.setRegion(getRegion());
return client;
}

Expand Down Expand Up @@ -165,7 +163,8 @@ private String addTruncatedShardList(final Map<String, ShardInfo> spoutShards, f

for (Shard s : streamShards) {
currShard = s.getShardId();
spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId()));
final boolean open = s.getSequenceNumberRange().getEndingSequenceNumber() == null;
spoutShards.put(s.getShardId(), new ShardInfo(s.getShardId(), open));

if (s.getParentShardId() != null && s.getAdjacentParentShardId() != null) {
// It's a merge. Set both parents of the merge to merge into this shard.
Expand Down
Loading

0 comments on commit 20f70a5

Please sign in to comment.