Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make KPL consumers more testable #136

Merged
merged 1 commit into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class Attempt {
private String errorCode;
private boolean success;

private Attempt(int delay, int duration, String errorMessage, String errorCode, boolean success) {
public Attempt(int delay, int duration, String errorMessage, String errorCode, boolean success) {
this.delay = delay;
this.duration = duration;
this.errorMessage = errorMessage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazonaws.services.kinesis.producer;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;


public interface IKinesisProducer {
ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data);

ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data);

int getOutstandingRecordsCount();

List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException;

List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException;

List<Metric> getMetrics() throws InterruptedException, ExecutionException;

List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException;

void destroy();

void flush(String stream);

void flush();

void flushSync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
* @author chaodeng
*
*/
public class KinesisProducer {
public class KinesisProducer implements IKinesisProducer {
private static final Logger log = LoggerFactory.getLogger(KinesisProducer.class);

private static final BigInteger UINT_128_MAX = new BigInteger(StringUtils.repeat("FF", 16), 16);
Expand Down Expand Up @@ -344,10 +344,68 @@ protected KinesisProducer(File inPipe, File outPipe) {
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data) {
return addUserRecord(stream, partitionKey, null, data);
}


/**
* Put a record asynchronously. A {@link ListenableFuture} is returned that
* can be used to retrieve the result, either by polling or by registering a
* callback.
*
* <p>
* The return value can be disregarded if you do not wish to process the
* result. Under the covers, the KPL will automatically re-attempt puts in
* case of transient errors (including throttling). A failed result is
* generally returned only if an irrecoverable error is detected (e.g.
* trying to put to a stream that doesn't exist), or if the record expires.
*
* <p>
* <b>Thread safe.</b>
*
* <p>
* To add a listener to the future:
* <p>
* <code>
* ListenableFuture&lt;PutRecordResult&gt; f = myKinesisProducer.addUserRecord(...);
* com.google.common.util.concurrent.Futures.addCallback(f, callback, executor);
* </code>
* <p>
* where <code>callback</code> is an instance of
* {@link com.google.common.util.concurrent.FutureCallback} and
* <code>executor</code> is an instance of
* {@link java.util.concurrent.Executor}.
* <p>
* <b>Important:</b>
* <p>
* If long-running tasks are performed in the callbacks, it is recommended
* that a custom executor be provided when registering callbacks to ensure
* that there are enough threads to achieve the desired level of
* parallelism. By default, the KPL will use an internal thread pool to
* execute callbacks, but this pool may not have a sufficient number of
* threads if a large number is desired.
* <p>
* Another option would be to hand the result off to a different component
* for processing and keep the callback routine fast.
*
* @param userRecord
* All data necessary to write to the stream.
* @return A future for the result of the put.
* @throws IllegalArgumentException
* if input does not meet stated constraints
* @throws DaemonException
* if the child process is dead
* @see ListenableFuture
* @see UserRecordResult
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord) {
return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData());
}

/**
* Put a record asynchronously. A {@link ListenableFuture} is returned that
* can be used to retrieve the result, either by polling or by registering a
Expand Down Expand Up @@ -410,6 +468,7 @@ public ListenableFuture<UserRecordResult> addUserRecord(String stream, String pa
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see UserRecordFailedException
*/
@Override
public ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) {
if (stream == null) {
throw new IllegalArgumentException("Stream name cannot be null");
Expand Down Expand Up @@ -490,6 +549,7 @@ public ListenableFuture<UserRecordResult> addUserRecord(String stream, String pa
*
* @return The number of unfinished records currently being processed.
*/
@Override
public int getOutstandingRecordsCount() {
return futures.size();
}
Expand Down Expand Up @@ -530,6 +590,7 @@ public int getOutstandingRecordsCount() {
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(String metricName, int windowSeconds) throws InterruptedException, ExecutionException {
MetricsRequest.Builder mrb = MetricsRequest.newBuilder();
if (metricName != null) {
Expand Down Expand Up @@ -587,6 +648,7 @@ public List<Metric> getMetrics(String metricName, int windowSeconds) throws Inte
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(String metricName) throws InterruptedException, ExecutionException {
return getMetrics(metricName, -1);
}
Expand Down Expand Up @@ -624,6 +686,7 @@ public List<Metric> getMetrics(String metricName) throws InterruptedException, E
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics() throws InterruptedException, ExecutionException {
return getMetrics(null);
}
Expand Down Expand Up @@ -661,6 +724,7 @@ public List<Metric> getMetrics() throws InterruptedException, ExecutionException
* from the child process.
* @see Metric
*/
@Override
public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, ExecutionException {
return getMetrics(null, windowSeconds);
}
Expand All @@ -684,6 +748,7 @@ public List<Metric> getMetrics(int windowSeconds) throws InterruptedException, E
* terminate the child process. If you are terminating the JVM then calling
* destroy is unnecessary since it will be done automatically.
*/
@Override
public void destroy() {
destroyed = true;
child.destroy();
Expand All @@ -706,6 +771,7 @@ public void destroy() {
* @throws DaemonException
* if the child process is dead
*/
@Override
public void flush(String stream) {
Flush.Builder f = Flush.newBuilder();
if (stream != null) {
Expand Down Expand Up @@ -733,6 +799,7 @@ public void flush(String stream) {
* @throws DaemonException
* if the child process is dead
*/
@Override
public void flush() {
flush(null);
}
Expand All @@ -757,6 +824,7 @@ public void flush() {
* @see KinesisProducerConfiguration#setRecordTtl(long)
* @see KinesisProducerConfiguration#setRequestTimeout(long)
*/
@Override
public void flushSync() {
while (getOutstandingRecordsCount() > 0) {
flush();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.amazonaws.services.kinesis.producer;

import java.nio.ByteBuffer;

public class UserRecord {
/**
* Stream to put to.
*/
private String streamName;

/**
* Partition key. Length must be at least one, and at most 256 (inclusive).
*/
private String partitionKey;

/**
* The hash value used to explicitly determine the shard the data
* record is assigned to by overriding the partition key hash.
* Must be a valid string representation of a positive integer
* with value between 0 and <tt>2^128 - 1</tt> (inclusive).
*/
private String explicitHashKey;

/**
* Binary data of the record. Maximum size 1MiB.
*/
private ByteBuffer data;

public UserRecord() {
}

public UserRecord(String streamName, String partitionKey, ByteBuffer data) {
this.streamName = streamName;
this.partitionKey = partitionKey;
this.data = data;
}

public UserRecord(String streamName, String partitionKey, String explicitHashKey, ByteBuffer data) {
this.streamName = streamName;
this.partitionKey = partitionKey;
this.explicitHashKey = explicitHashKey;
this.data = data;
}

public String getStreamName() {
return streamName;
}

public void setStreamName(String streamName) {
this.streamName = streamName;
}

public UserRecord withStreamName(String streamName) {
this.streamName = streamName;
return this;
}

public String getPartitionKey() {
return partitionKey;
}

public void setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
}

public UserRecord withPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
return this;
}

public ByteBuffer getData() {
return data;
}

public void setData(ByteBuffer data) {
this.data = data;
}

public UserRecord withData(ByteBuffer byteBuffer) {
this.data = byteBuffer;
return this;
}

public String getExplicitHashKey() {
return explicitHashKey;
}

public void setExplicitHashKey(String explicitHashKey) {
this.explicitHashKey = explicitHashKey;
}

public UserRecord withExplicitHashKey(String explicitHashKey) {
this.explicitHashKey = explicitHashKey;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class UserRecordResult {
private String shardId;
private boolean successful;

private UserRecordResult(List<Attempt> attempts, String sequenceNumber, String shardId, boolean successful) {
public UserRecordResult(List<Attempt> attempts, String sequenceNumber, String shardId, boolean successful) {
this.attempts = attempts;
this.sequenceNumber = sequenceNumber;
this.shardId = shardId;
Expand Down