Skip to content

Commit

Permalink
WIP - smoke tests fail (consumer looses records)
Browse files Browse the repository at this point in the history
  • Loading branch information
psolomin committed Feb 15, 2023
1 parent 0bdfa2b commit 7a6ab3a
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
Expand All @@ -41,6 +44,7 @@
import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler;

class EFOShardSubscriber {
private static final Logger LOG = LoggerFactory.getLogger(EFOShardSubscriber.class);
private final EFOShardSubscribersPool pool;
private final String consumerArn;

Expand All @@ -60,42 +64,25 @@ class EFOShardSubscriber {

private final ShardEventsSubscriber eventsSubscriber = new ShardEventsSubscriber();

/** Tracks number of delivered events in flight (until ack-ed). */
AtomicInteger inFlight;

private static final Integer IN_FLIGHT_LIMIT = 3;

/**
* Async completion handler for {@link AsyncClientProxy#subscribeToShard} that:
* Async completion handler for {@link KinesisAsyncClient#subscribeToShard} that:
* <li>exists immediately if {@link #done} is already completed (exceptionally),
* <li>re-subscribes at {@link ShardEventsSubscriber#sequenceNumber} for retryable errors such as
* retryable {@link SdkException}, {@link ClosedChannelException}, {@link ChannelException},
* {@link TimeoutException} (any of these might be wrapped in {@link CompletionException}s)
* <li>or completes {@link #done} exceptionally for any other error,
* <li>completes {@done} normally if subscriber {@link #isStopped} or if shard completed (no
* <li>completes {@link #done} normally if subscriber {@link #isStopped} or if shard completed (no
* further {@link ShardEventsSubscriber#sequenceNumber}),
* <li>or otherwise re-subscribes at {@link ShardEventsSubscriber#sequenceNumber}.
*/
@SuppressWarnings({"FutureReturnValueIgnored", "all"})
BiConsumer<Void, Throwable> reSubscriptionHandler =
(Void unused, Throwable error) -> {
if (error != null && !isRetryAble(error)) {
done.completeExceptionally(error);
} else if (!isStopped) {
String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
if (lastContinuationSequenceNumber == null) {
done.complete(null); // completely consumed this shard, done
} else {
subscribe(
StartingPosition.builder()
.type(ShardIteratorType.AT_SEQUENCE_NUMBER)
.sequenceNumber(lastContinuationSequenceNumber)
.build());
}
} else {
done.complete(null);
}
};
private final BiConsumer<Void, Throwable> reSubscriptionHandler;

/** Tracks number of delivered events in flight (until ack-ed). */
AtomicInteger inFlight;

private static final Integer IN_FLIGHT_LIMIT = 3;
private static final Long ON_ERROR_COOL_DOWN_MS = 5_000L;

/**
* TODO: add 2 other retry-able cases
Expand All @@ -105,7 +92,15 @@ class EFOShardSubscriber {
*/
private static boolean isRetryAble(Throwable error) {
Throwable cause = unWrapCompletionException(error);
return cause instanceof ReadTimeoutException;
if (cause instanceof SdkClientException
&& cause.getCause() != null
&& cause.getCause() instanceof ReadTimeoutException) {
return true;
}
if (cause instanceof ReadTimeoutException) {
return true;
}
return false;
}

/**
Expand All @@ -127,6 +122,7 @@ private static Throwable unWrapCompletionException(Throwable completionException
return current;
}

@SuppressWarnings({"FutureReturnValueIgnored", "all"})
public EFOShardSubscriber(
EFOShardSubscribersPool pool,
String shardId,
Expand All @@ -137,6 +133,29 @@ public EFOShardSubscriber(
this.shardId = shardId;
this.kinesis = kinesis;
this.inFlight = new AtomicInteger();
this.reSubscriptionHandler =
(Void unused, Throwable error) -> {
if (error != null && !isRetryAble(error)) {
done.completeExceptionally(error);
} else if (!isStopped) {
String lastContinuationSequenceNumber = eventsSubscriber.sequenceNumber;
if (lastContinuationSequenceNumber == null) {
done.complete(null); // completely consumed this shard, done
} else {
Long delay = (error != null) ? ON_ERROR_COOL_DOWN_MS : 0L;
pool.delayedTask(
() ->
subscribe(
StartingPosition.builder()
.type(ShardIteratorType.AT_SEQUENCE_NUMBER)
.sequenceNumber(lastContinuationSequenceNumber)
.build()),
delay);
}
} else {
done.complete(null);
}
};
}

/**
Expand All @@ -153,10 +172,10 @@ public EFOShardSubscriber(
*/
@SuppressWarnings("FutureReturnValueIgnored")
CompletableFuture<Void> subscribe(StartingPosition position) {
SubscribeToShardRequest request = subscribeRequest(position);
LOG.info("Pool {} Shard {} starting subscribe request {}", pool.getPoolId(), shardId, request);
try {
kinesis
.subscribeToShard(subscribeRequest(position), responseHandler())
.whenCompleteAsync(reSubscriptionHandler);
kinesis.subscribeToShard(request, responseHandler()).whenComplete(reSubscriptionHandler);
return done;
} catch (RuntimeException e) {
done.completeExceptionally(e);
Expand All @@ -181,6 +200,7 @@ private SubscribeToShardResponseHandler responseHandler() {
* ShardEventsSubscriber#cancel()} if defined.
*/
void cancel() {
LOG.info("Pool {} Shard {} cancelling", pool.getPoolId(), shardId);
if (!isStopped && eventsSubscriber != null) {
eventsSubscriber.cancel();
isStopped = true;
Expand Down Expand Up @@ -257,7 +277,7 @@ public void onNext(SubscribeToShardEventStream event) {

@Override
public void onError(Throwable t) {
// nothing to do here, handled in resubscriptionHandler
// nothing to do here, handled in {@link #resubscriptionHandler}
}

/** Unsets {@link #eventsSubscriber} of {@link EFOShardSubscriber}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.io.IOException;
Expand All @@ -25,9 +26,13 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
Expand Down Expand Up @@ -120,6 +125,8 @@ class EFOShardSubscribersPool {
}
};

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

// EventRecords iterator that is currently consumed
@Nullable EventRecords current = null;

Expand Down Expand Up @@ -259,7 +266,9 @@ public UnboundedSource.CheckpointMark getCheckpointMark() {
return new KinesisReaderCheckpoint(checkpoints);
}

public boolean stop() {
// TODO:
public boolean stop() throws InterruptedException {
state.forEach((shardId, st) -> st.subscriber.cancel());
return true;
}

Expand Down Expand Up @@ -332,4 +341,17 @@ protected Iterator<KinesisClientRecord> delegate() {
UUID getPoolId() {
return poolId;
}

@SuppressWarnings("FutureReturnValueIgnored")
<T> CompletableFuture<T> delayedTask(Supplier<CompletableFuture<T>> task, long delayMs) {
if (delayMs <= 0) {
return task.get();
}
final CompletableFuture<T> cf = new CompletableFuture<>();
scheduler.schedule(
() -> task.get().handle((t, e) -> e == null ? cf.complete(t) : cf.completeExceptionally(e)),
delayMs,
MILLISECONDS);
return cf;
}
}

0 comments on commit 7a6ab3a

Please sign in to comment.