Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Everything else #971

Open
wants to merge 3 commits into
base: v1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private static final int TIME_TO_KEEP_ALIVE = 5;
private static final int CORE_THREAD_POOL_COUNT = 1;

private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
final Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;

public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final IDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
this(dataFetcher, buildExector(maxGetRecordsThreadPool, shardId), retryGetRecordsInSeconds, shardId);
}

public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
public AsynchronousGetRecordsRetrievalStrategy(final IDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId);
}

AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
AsynchronousGetRecordsRetrievalStrategy(IDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
Expand Down Expand Up @@ -148,7 +148,7 @@ private static ExecutorService buildExector(int maxGetRecordsThreadPool, String
}

@Override
public KinesisDataFetcher getDataFetcher() {
public IDataFetcher getDataFetcher() {
return dataFetcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* If we don't find a checkpoint for the parent shard(s), we assume they have been trimmed and directly
* proceed with processing data from the shard.
*/
class BlockOnParentShardTask implements ITask {
public class BlockOnParentShardTask implements ITask {

private static final Log LOG = LogFactory.getLog(BlockOnParentShardTask.class);
private final ShardInfo shardInfo;
Expand All @@ -45,9 +45,9 @@ class BlockOnParentShardTask implements ITask {
* @param leaseManager Used to fetch the lease and checkpoint info for parent shards
* @param parentShardPollIntervalMillis Sleep time if the parent shard has not completed processing
*/
BlockOnParentShardTask(ShardInfo shardInfo,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis) {
public BlockOnParentShardTask(ShardInfo shardInfo,
ILeaseManager<KinesisClientLease> leaseManager,
long parentShardPollIntervalMillis) {
this.shardInfo = shardInfo;
this.leaseManager = leaseManager;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public interface GetRecordsRetrievalStrategy {
boolean isShutdown();

/**
* Returns the KinesisDataFetcher used to getRecords from Kinesis.
* Returns the IDataFetcher used to getRecords
*
* @return KinesisDataFetcher
* @return IDataFetcher
*/
KinesisDataFetcher getDataFetcher();
IDataFetcher getDataFetcher();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.ChildShard;

import java.util.List;

public interface IDataFetcher {

DataFetcherResult getRecords(int maxRecords);

void initialize(String initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);

void initialize(ExtendedSequenceNumber initialCheckpoint, InitialPositionInStreamExtended initialPositionInStream);

void advanceIteratorTo(String sequenceNumber, InitialPositionInStreamExtended initialPositionInStream);

void restartIterator();

boolean isShardEndReached();

List<ChildShard> getChildShards();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

public interface IShardConsumer {

boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist();

enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE, LEASE_NOT_FOUND
}

boolean consumeShard();

boolean isShutdown();

ShutdownReason getShutdownReason();

boolean beginShutdown();

void notifyShutdownRequested(ShutdownNotification shutdownNotification);

KinesisConsumerStates.ShardConsumerState getCurrentState();

boolean isShutdownRequested();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

public interface IShardConsumerFactory {

/**
* Returns a shard consumer to be used for consuming a (assigned) shard.
*
* @return Returns a shard consumer object.
*/
IShardConsumer createShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpointTracker,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesUponShardCompletion,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long taskBackoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
/**
* Task for initializing shard position and invoking the RecordProcessor initialize() API.
*/
class InitializeTask implements ITask {
public class InitializeTask implements ITask {

private static final Log LOG = LogFactory.getLog(InitializeTask.class);

private static final String RECORD_PROCESSOR_INITIALIZE_METRIC = "RecordProcessor.initialize";

private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor;
private final KinesisDataFetcher dataFetcher;
private final IDataFetcher dataFetcher;
private final TaskType taskType = TaskType.INITIALIZE;
private final ICheckpoint checkpoint;
private final RecordProcessorCheckpointer recordProcessorCheckpointer;
Expand All @@ -49,14 +49,14 @@ class InitializeTask implements ITask {
/**
* Constructor.
*/
InitializeTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher,
long backoffTimeMillis,
StreamConfig streamConfig,
GetRecordsCache getRecordsCache) {
public InitializeTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
ICheckpoint checkpoint,
RecordProcessorCheckpointer recordProcessorCheckpointer,
IDataFetcher dataFetcher,
long backoffTimeMillis,
StreamConfig streamConfig,
GetRecordsCache getRecordsCache) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.checkpoint = checkpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;

import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import org.apache.commons.lang3.Validate;

import com.amazonaws.ClientConfiguration;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class KinesisClientLibConfiguration {
public static final int DEFAULT_MAX_RECORDS = 10000;

/**
* The default value for how long the {@link ShardConsumer} should sleep if no records are returned from the call to
* The default value for how long the {@link KinesisShardConsumer} should sleep if no records are returned from the call to
* {@link com.amazonaws.services.kinesis.AmazonKinesis#getRecords(com.amazonaws.services.kinesis.model.GetRecordsRequest)}.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;
Expand Down Expand Up @@ -627,7 +628,7 @@ public KinesisClientLibConfiguration(String applicationName,
* @param billingMode The DDB Billing mode to set for lease table creation.
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* {@link LeaseCleanupManager}
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
Expand Down Expand Up @@ -926,7 +927,7 @@ public boolean shouldCleanupLeasesUponShardCompletion() {
}

/**
* @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @return Interval in millis at which to run lease cleanup thread in {@link LeaseCleanupManager}
*/
public long leaseCleanupIntervalMillis() {
return leaseCleanupIntervalMillis;
Expand Down Expand Up @@ -1030,7 +1031,7 @@ public int getInitialLeaseTableWriteCapacity() {
* Keeping it protected to forbid outside callers from depending on this internal object.
* @return The initialPositionInStreamExtended object.
*/
protected InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
public InitialPositionInStreamExtended getInitialPositionInStreamExtended() {
return initialPositionInStreamExtended;
}

Expand Down Expand Up @@ -1623,7 +1624,7 @@ public KinesisClientLibConfiguration withMaxInitializationAttempts(int maxInitia

/**
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* {@link LeaseCleanupManager}
* @return
*/
public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
/**
* This class is used to coordinate/manage leases owned by this worker process and to get/set checkpoints.
*/
class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {
public class KinesisClientLibLeaseCoordinator extends LeaseCoordinator<KinesisClientLease> implements ICheckpoint {

private static final Log LOG = LogFactory.getLog(KinesisClientLibLeaseCoordinator.class);

Expand Down Expand Up @@ -368,7 +368,7 @@ void runLeaseRenewer() throws DependencyException, InvalidStateException {
*
* @return LeaseManager
*/
ILeaseManager<KinesisClientLease> getLeaseManager() {
public ILeaseManager<KinesisClientLease> getLeaseManager() {
return leaseManager;
}

Expand Down
Loading