Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static class Builder<S extends CoordinatorShard<U>, U> {
private CoordinatorMetrics coordinatorMetrics;
private Serializer<U> serializer;
private Compression compression;
private int appendLingerMs;
private OptionalInt appendLingerMs;
private ExecutorService executorService;

public Builder<S, U> withLogPrefix(String logPrefix) {
Expand Down Expand Up @@ -186,6 +186,11 @@ public Builder<S, U> withCompression(Compression compression) {
}

public Builder<S, U> withAppendLingerMs(int appendLingerMs) {
this.appendLingerMs = OptionalInt.of(appendLingerMs);
return this;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the references, this function is only used by test code. Should we remove it and update test cases to use OptionalInt one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can remove the method.


public Builder<S, U> withAppendLingerMs(OptionalInt appendLingerMs) {
this.appendLingerMs = appendLingerMs;
return this;
}
Expand All @@ -195,6 +200,7 @@ public Builder<S, U> withExecutorService(ExecutorService executorService) {
return this;
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public CoordinatorRuntime<S, U> build() {
if (logPrefix == null)
logPrefix = "";
Expand All @@ -220,8 +226,10 @@ public CoordinatorRuntime<S, U> build() {
throw new IllegalArgumentException("Serializer must be set.");
if (compression == null)
compression = Compression.NONE;
if (appendLingerMs < 0)
throw new IllegalArgumentException("AppendLinger must be >= 0");
if (appendLingerMs == null)
appendLingerMs = OptionalInt.empty();
Comment on lines +224 to +225
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the production code, the appendLingerMs input cannot be null. How about we set a default value OptionalInt.empty() to appendLingerMs and remove appendLingerMs == null check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the existing pattern in the builder. We don't set a default value for logPrefix, logContext and compression and initialize them in build().

if (appendLingerMs.isPresent() && appendLingerMs.getAsInt() < -1)
throw new IllegalArgumentException("AppendLinger must be empty or >= 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the appendLingerMs is -1, it will be input as OptionalInt.empty(). How about checking the value is >= 0 here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The check is supposed to reject -1.

if (executorService == null)
throw new IllegalArgumentException("ExecutorService must be set.");

Expand Down Expand Up @@ -599,6 +607,12 @@ class CoordinatorContext {
*/
CoordinatorBatch currentBatch;

/**
* The batch epoch. Incremented every time a new batch is started.
* Only valid for the lifetime of the CoordinatorContext. The first batch has an epoch of 1.
*/
int batchEpoch;

/**
* Constructor.
*
Expand Down Expand Up @@ -769,6 +783,24 @@ private void freeCurrentBatch() {
currentBatch = null;
}

/**
* Adds a flush event to the end of the event queue, after any existing writes in the queue.
*
* @param expectedBatchEpoch The epoch of the batch to flush.
*/
private void enqueueAdaptiveFlush(int expectedBatchEpoch) {
enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
withActiveContextOrThrow(tp, context -> {
// The batch could have already been flushed because it reached the maximum
// batch size or a transactional write came in. When this happens, we want
// to avoid flushing the next batch early.
if (context.currentBatch != null && context.batchEpoch == expectedBatchEpoch) {
context.flushCurrentBatch();
}
});
}));
}

/**
* Flushes the current (or pending) batch to the log. When the batch is written
* locally, a new snapshot is created in the snapshot registry and the events
Expand Down Expand Up @@ -833,7 +865,11 @@ private void flushCurrentBatch() {
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
if (currentBatch.builder.isTransactional() ||
// When adaptive linger time is enabled, we avoid flushing here.
// Instead, we rely on the flush event enqueued at the back of the event queue.
(appendLingerMs.isPresent() && (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs.getAsInt()) ||
!currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
Expand Down Expand Up @@ -882,20 +918,31 @@ private void maybeAllocateNewBatch(
maxBatchSize
);

batchEpoch++;

Optional<TimerTask> lingerTimeoutTask = Optional.empty();
if (appendLingerMs > 0) {
lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) {
@Override
public void run() {
// An event to flush the batch is pushed to the front of the queue
// to ensure that the linger time is respected.
enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
if (this.isCancelled()) return;
withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch);
}));
}
});
CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
if (appendLingerMs.isPresent()) {
if (appendLingerMs.getAsInt() > 0) {
lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs.getAsInt()) {
@Override
public void run() {
// An event to flush the batch is pushed to the front of the queue
// to ensure that the linger time is respected.
enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> {
if (this.isCancelled()) return;
withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch);
}));
}
});
CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get());
}
} else {
// Always queue a flush immediately at the end of the queue, unless the batch is
// transactional. Transactional batches are flushed immediately at the end of
// the write, so a flush event is never needed.
if (!builder.isTransactional()) {
enqueueAdaptiveFlush(batchEpoch);
}
}

currentBatch = new CoordinatorBatch(
Expand Down Expand Up @@ -1991,9 +2038,10 @@ public void onHighWatermarkUpdated(

/**
* The duration in milliseconds that the coordinator will wait for writes to
* accumulate before flushing them to disk.
* accumulate before flushing them to disk. {@code OptionalInt.empty()} indicates
* an adaptive linger time based on the workload.
*/
private final int appendLingerMs;
private final OptionalInt appendLingerMs;

/**
* The executor service used by the coordinator runtime to schedule
Expand Down Expand Up @@ -2045,7 +2093,7 @@ private CoordinatorRuntime(
CoordinatorMetrics coordinatorMetrics,
Serializer<U> serializer,
Compression compression,
int appendLingerMs,
OptionalInt appendLingerMs,
ExecutorService executorService
) {
this.logPrefix = logPrefix;
Expand Down
Loading
Loading