diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 552e7e72effd8..28fc5672f4093 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -117,7 +117,7 @@ public static class Builder, U> { private CoordinatorMetrics coordinatorMetrics; private Serializer serializer; private Compression compression; - private int appendLingerMs; + private OptionalInt appendLingerMs; private ExecutorService executorService; public Builder withLogPrefix(String logPrefix) { @@ -185,7 +185,7 @@ public Builder withCompression(Compression compression) { return this; } - public Builder withAppendLingerMs(int appendLingerMs) { + public Builder withAppendLingerMs(OptionalInt appendLingerMs) { this.appendLingerMs = appendLingerMs; return this; } @@ -195,6 +195,7 @@ public Builder withExecutorService(ExecutorService executorService) { return this; } + @SuppressWarnings("checkstyle:CyclomaticComplexity") public CoordinatorRuntime build() { if (logPrefix == null) logPrefix = ""; @@ -220,8 +221,10 @@ public CoordinatorRuntime build() { throw new IllegalArgumentException("Serializer must be set."); if (compression == null) compression = Compression.NONE; - if (appendLingerMs < 0) - throw new IllegalArgumentException("AppendLinger must be >= 0"); + if (appendLingerMs == null) + appendLingerMs = OptionalInt.empty(); + if (appendLingerMs.isPresent() && appendLingerMs.getAsInt() < 0) + throw new IllegalArgumentException("AppendLinger must be empty or >= 0"); if (executorService == null) throw new IllegalArgumentException("ExecutorService must be set."); @@ -599,6 +602,12 @@ class CoordinatorContext { */ CoordinatorBatch currentBatch; + /** + * The batch epoch. Incremented every time a new batch is started. + * Only valid for the lifetime of the CoordinatorContext. The first batch has an epoch of 1. + */ + int batchEpoch; + /** * Constructor. * @@ -769,6 +778,24 @@ private void freeCurrentBatch() { currentBatch = null; } + /** + * Adds a flush event to the end of the event queue, after any existing writes in the queue. + * + * @param expectedBatchEpoch The epoch of the batch to flush. + */ + private void enqueueAdaptiveFlush(int expectedBatchEpoch) { + enqueueLast(new CoordinatorInternalEvent("FlushBatch", tp, () -> { + withActiveContextOrThrow(tp, context -> { + // The batch could have already been flushed because it reached the maximum + // batch size or a transactional write came in. When this happens, we want + // to avoid flushing the next batch early. + if (context.currentBatch != null && context.batchEpoch == expectedBatchEpoch) { + context.flushCurrentBatch(); + } + }); + })); + } + /** * Flushes the current (or pending) batch to the log. When the batch is written * locally, a new snapshot is created in the snapshot registry and the events @@ -833,7 +860,11 @@ private void flushCurrentBatch() { */ private void maybeFlushCurrentBatch(long currentTimeMs) { if (currentBatch != null) { - if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) { + if (currentBatch.builder.isTransactional() || + // When adaptive linger time is enabled, we avoid flushing here. + // Instead, we rely on the flush event enqueued at the back of the event queue. + (appendLingerMs.isPresent() && (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs.getAsInt()) || + !currentBatch.builder.hasRoomFor(0)) { flushCurrentBatch(); } } @@ -882,20 +913,31 @@ private void maybeAllocateNewBatch( maxBatchSize ); + batchEpoch++; + Optional lingerTimeoutTask = Optional.empty(); - if (appendLingerMs > 0) { - lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs) { - @Override - public void run() { - // An event to flush the batch is pushed to the front of the queue - // to ensure that the linger time is respected. - enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> { - if (this.isCancelled()) return; - withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch); - })); - } - }); - CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get()); + if (appendLingerMs.isPresent()) { + if (appendLingerMs.getAsInt() > 0) { + lingerTimeoutTask = Optional.of(new TimerTask(appendLingerMs.getAsInt()) { + @Override + public void run() { + // An event to flush the batch is pushed to the front of the queue + // to ensure that the linger time is respected. + enqueueFirst(new CoordinatorInternalEvent("FlushBatch", tp, () -> { + if (this.isCancelled()) return; + withActiveContextOrThrow(tp, CoordinatorContext::flushCurrentBatch); + })); + } + }); + CoordinatorRuntime.this.timer.add(lingerTimeoutTask.get()); + } + } else { + // Always queue a flush immediately at the end of the queue, unless the batch is + // transactional. Transactional batches are flushed immediately at the end of + // the write, so a flush event is never needed. + if (!builder.isTransactional()) { + enqueueAdaptiveFlush(batchEpoch); + } } currentBatch = new CoordinatorBatch( @@ -1991,9 +2033,10 @@ public void onHighWatermarkUpdated( /** * The duration in milliseconds that the coordinator will wait for writes to - * accumulate before flushing them to disk. + * accumulate before flushing them to disk. {@code OptionalInt.empty()} indicates + * an adaptive linger time based on the workload. */ - private final int appendLingerMs; + private final OptionalInt appendLingerMs; /** * The executor service used by the coordinator runtime to schedule @@ -2045,7 +2088,7 @@ private CoordinatorRuntime( CoordinatorMetrics coordinatorMetrics, Serializer serializer, Compression compression, - int appendLingerMs, + OptionalInt appendLingerMs, ExecutorService executorService ) { this.logPrefix = logPrefix; diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index de1aa21f3f116..1876eb8640f56 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -634,7 +634,7 @@ public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE .withCoordinatorRuntimeMetrics(metrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -2615,6 +2615,7 @@ public void testHighWatermarkUpdate() { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(0)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -2689,7 +2690,7 @@ public void testHighWatermarkUpdateWithDeferredEventExceptions() throws Executio .withCoordinatorRuntimeMetrics(metrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -2766,6 +2767,7 @@ public void testWriteEventWriteTimeoutTaskIsCancelledWhenHighWatermarkIsUpdated( .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(0)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3163,7 +3165,7 @@ public void testScheduleWriteOperationWithBatching() throws ExecutionException, .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3298,7 +3300,7 @@ public void testScheduleWriteOperationWithBatchingWhenRecordsTooLarge() { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3350,7 +3352,7 @@ public void testScheduleWriteOperationWithBatchingWhenWriteFails() { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3436,7 +3438,7 @@ public void testScheduleWriteOperationWithBatchingWhenReplayFails() { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3534,7 +3536,7 @@ public void testScheduleTransactionalWriteOperationWithBatching() throws Executi .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3649,6 +3651,453 @@ public void testScheduleTransactionalWriteOperationWithBatching() throws Executi assertNull(complete1.get(5, TimeUnit.SECONDS)); } + @Test + public void testAdaptiveAppendLingerTime() { + MockTimer timer = new MockTimer(); + ManualEventProcessor processor = new ManualEventProcessor(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.empty()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Write #1. + runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record1", "record2"), "response1") + ); + + // Write #2. + runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record3"), "response2") + ); + + // Execute write #1. + processor.poll(); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // A flush event is queued after write #2. + assertEquals(2, processor.size()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1"), + new MockCoordinatorShard.RecordAndMetadata(1, "record2") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of(), writer.entries(TP)); + + // Execute write #2. + processor.poll(); + assertEquals(1, processor.size()); + + // The batch has not been flushed. + assertNotNull(ctx.currentBatch); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1"), + new MockCoordinatorShard.RecordAndMetadata(1, "record2"), + new MockCoordinatorShard.RecordAndMetadata(2, "record3") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of(), writer.entries(TP)); + + // Flush the batch. + processor.poll(); + + // The batch is flushed. + assertNull(ctx.currentBatch); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1"), + new MockCoordinatorShard.RecordAndMetadata(1, "record2"), + new MockCoordinatorShard.RecordAndMetadata(2, "record3") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), "record1", "record2", "record3") + ), writer.entries(TP)); + } + + /** + * Tests a flush triggered by the max batch size with an adaptive append linger time. + * + * The flush for the first batch must not flush the second batch. + */ + @Test + @SuppressWarnings("checkstyle:MethodLength") + public void testAdaptiveAppendLingerWithMaxBatchSizeFlush() { + MockTimer timer = new MockTimer(); + ManualEventProcessor processor = new ManualEventProcessor(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.empty()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4', '5').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(0, 1), "response1") + ); + + // Write #2. + runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(1, 2), "response2") + ); + + // Write #3. + runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(2, 3), "response3") + ); + + // Write #4. + runtime.scheduleWriteOperation("write#4", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(3, 4), "response4") + ); + + // Execute write #1, write #2 and write #3. + processor.poll(); + processor.poll(); + processor.poll(); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // A flush event is queued after write #4. + assertEquals(2, processor.size()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of(), writer.entries(TP)); + + // Write #5. + runtime.scheduleWriteOperation("write#5", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(records.subList(4, 5), "response5") + ); + + // Execute write #4. This one cannot go into the existing batch + // so the existing batch should be flushed and a new one should be created. + processor.poll(); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Another flush event is queued after write #5. + assertEquals(3, processor.size()); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), records.subList(0, 3)) + ), writer.entries(TP)); + + // Execute the first flush. + processor.poll(); + assertEquals(2, processor.size()); + + // The flush does not belong to the current batch and is ignored. + assertNotNull(ctx.currentBatch); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), records.subList(0, 3)) + ), writer.entries(TP)); + + // Execute write #5. + processor.poll(); + assertEquals(1, processor.size()); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)), + new MockCoordinatorShard.RecordAndMetadata(4, records.get(4)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), records.subList(0, 3)) + ), writer.entries(TP)); + + // Execute the second flush. + processor.poll(); + assertEquals(0, processor.size()); + + // The batch is flushed. + assertNull(ctx.currentBatch); + + // Verify the state. + assertEquals(5L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 3L, 5L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)), + new MockCoordinatorShard.RecordAndMetadata(4, records.get(4)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), records.subList(0, 3)), + TestUtil.records(timer.time().milliseconds(), records.subList(3, 5)) + ), writer.entries(TP)); + } + + /** + * Tests a transactional write with an adaptive append linger time. + * + * The transactional write must not enqueue a flush, since it flushes immediately. + * The flush for the batch before the transactional write must not flush the batch after the + * transactional write. + */ + @Test + public void testAdaptiveAppendLingerWithTransactionalWrite() { + MockTimer timer = new MockTimer(); + ManualEventProcessor processor = new ManualEventProcessor(); + MockPartitionWriter writer = new MockPartitionWriter(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(processor) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.empty()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Loads the coordinator. Poll once to execute the load operation and once + // to complete the load. + runtime.scheduleLoadOperation(TP, 10); + processor.poll(); + processor.poll(); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertNull(ctx.currentBatch); + + // Write #1. + runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record1"), "response1") + ); + + // Transactional write #2. This will flush the batch. + runtime.scheduleTransactionalWriteOperation( + "txn-write#1", + TP, + "transactional-id", + 100L, + (short) 50, + Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record2"), "response2"), + TXN_OFFSET_COMMIT_LATEST_VERSION + ); + + // Write #3. + runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of("record3"), "response3") + ); + + assertEquals(3, processor.size()); + + // Execute write #1. + processor.poll(); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // A flush event is queued after write #3. + assertEquals(3, processor.size()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of(), writer.entries(TP)); + + // Execute transactional write #2. + processor.poll(); + + // The batch is flushed. + assertNull(ctx.currentBatch); + + // No flush event is queued. + assertEquals(2, processor.size()); + + // Verify the state. The current batch and the transactional records are + // written to the log. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), "record1"), + TestUtil.transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record2") + ), writer.entries(TP)); + + // Execute write #3. + processor.poll(); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // A flush event is queued after the first flush. + assertEquals(2, processor.size()); + + // Verify the state. Records are replayed but no batch written. + assertEquals(2L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 1L, 2L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1"), + new MockCoordinatorShard.RecordAndMetadata(2, "record3") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), "record1"), + TestUtil.transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record2") + ), writer.entries(TP)); + + // Execute the first flush. + processor.poll(); + assertEquals(1, processor.size()); + + // The flush does not belong to the current batch and is ignored. + assertNotNull(ctx.currentBatch); + + // Execute the second flush. + processor.poll(); + assertEquals(0, processor.size()); + + // The batch is flushed. + assertNull(ctx.currentBatch); + + // Verify the state. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of(0L, 1L, 2L, 3L), ctx.coordinator.snapshotRegistry().epochsList()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, "record1"), + new MockCoordinatorShard.RecordAndMetadata(2, "record3") + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + TestUtil.records(timer.time().milliseconds(), "record1"), + TestUtil.transactionalRecords(100L, (short) 50, timer.time().milliseconds(), "record2"), + TestUtil.records(timer.time().milliseconds(), "record3") + ), writer.entries(TP)); + } + @Test public void testStateMachineIsReloadedWhenOutOfSync() { MockTimer timer = new MockTimer(); @@ -3677,7 +4126,7 @@ public long append( .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3792,7 +4241,7 @@ public void close() {} .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3841,7 +4290,7 @@ public void testScheduleNonAtomicWriteOperation() throws ExecutionException, Int .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -3950,7 +4399,7 @@ public void testScheduleNonAtomicWriteOperationWithRecordTooLarge() throws Inter .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4046,7 +4495,7 @@ public void testScheduleNonAtomicWriteOperationWhenWriteFails() { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4133,7 +4582,7 @@ public void testEmptyBatch() throws Exception { .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(serializer) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4202,7 +4651,7 @@ public void testRecordFlushTime() throws Exception { .withCoordinatorRuntimeMetrics(runtimeMetrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4313,7 +4762,7 @@ public void testCompressibleRecordTriggersFlushAndSucceeds() throws Exception { .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withCompression(compression) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4399,7 +4848,7 @@ public void testLargeCompressibleRecordTriggersFlushAndSucceeds() throws Excepti .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withCompression(compression) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4485,7 +4934,7 @@ public void testLargeUncompressibleRecordTriggersFlushAndFails() throws Exceptio .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withCompression(compression) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4576,6 +5025,7 @@ public void testRecordEventPurgatoryTime() throws Exception { .withCoordinatorRuntimeMetrics(runtimeMetrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(0)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4662,6 +5112,7 @@ public void testWriteEventCompletesOnlyOnce() throws Exception { .withCoordinatorRuntimeMetrics(runtimeMetrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(0)) .withExecutorService(mock(ExecutorService.class)) .build(); @@ -4807,6 +5258,7 @@ public void testCoordinatorExecutor() { .withCoordinatorRuntimeMetrics(runtimeMetrics) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(0)) .withExecutorService(executorService) .build(); @@ -4888,7 +5340,7 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) .withSerializer(new StringSerializer()) - .withAppendLingerMs(10) + .withAppendLingerMs(OptionalInt.of(10)) .withExecutorService(mock(ExecutorService.class)) .build(); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java index 60b74c3a12f72..19c79e267147f 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/DirectEventProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.coordinator.common.runtime; +import java.util.Deque; +import java.util.LinkedList; import java.util.concurrent.RejectedExecutionException; /** @@ -23,24 +25,43 @@ * useful in unit tests where execution in threads is not required. */ public class DirectEventProcessor implements CoordinatorEventProcessor { + private final Deque queue; + private boolean inEvent; + + public DirectEventProcessor() { + this.queue = new LinkedList<>(); + this.inEvent = false; + } + @Override public void enqueueLast(CoordinatorEvent event) throws RejectedExecutionException { - try { - event.run(); - } catch (Throwable ex) { - event.complete(ex); - } + queue.addLast(event); + processQueue(); } @Override public void enqueueFirst(CoordinatorEvent event) throws RejectedExecutionException { - try { - event.run(); - } catch (Throwable ex) { - event.complete(ex); - } + queue.addFirst(event); + processQueue(); } @Override public void close() {} + + private void processQueue() { + if (inEvent) { + return; + } + + inEvent = true; + while (!queue.isEmpty()) { + CoordinatorEvent event = queue.removeFirst(); + try { + event.run(); + } catch (Throwable ex) { + event.complete(ex); + } + } + inEvent = false; + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index ab7f45ae6895e..d0900a63ce08e 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -34,6 +34,7 @@ import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig +import org.apache.kafka.coordinator.share.ShareCoordinatorConfig import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig} @@ -1038,6 +1039,7 @@ class KafkaConfigTest { /** New group coordinator configs */ case GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) + case GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -2) /** Consumer groups configs */ case GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1) @@ -1077,6 +1079,9 @@ class KafkaConfigTest { case GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1) case GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1) + /** Share coordinator configs */ + case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -2) + case _ => assertPropertyInvalid(baseProperties, name, "not_a_number", "-1") } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index e8a2f49663955..fb3e820ea8856 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.OptionalInt; import java.util.function.Function; import java.util.stream.Collectors; @@ -73,8 +74,9 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + "but also increases the response latency for requests, as the coordinator must wait for batches to be flushed to " + - "disk before completing request processing. Transactional writes are not accumulated."; - public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 5; + "disk before completing request processing. Transactional writes are not accumulated. " + + "Set to -1 for an adaptive linger time that minimizes latency based on the workload."; + public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = -1; public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads"; public static final String GROUP_COORDINATOR_NUM_THREADS_DOC = "The number of threads used by the group coordinator."; @@ -300,7 +302,7 @@ public class GroupCoordinatorConfig { .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, ConfigDef.ValidList.in(false, Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) - .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) + .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC) .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC) .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC) @@ -647,10 +649,14 @@ public int numThreads() { /** * The duration in milliseconds that the coordinator will wait for writes to - * accumulate before flushing them to disk. + * accumulate before flushing them to disk. {@code OptionalInt.empty()} indicates + * an adaptive linger time based on the workload. */ - public int appendLingerMs() { - return appendLingerMs; + public OptionalInt appendLingerMs() { + if (appendLingerMs == -1) { + return OptionalInt.empty(); + } + return OptionalInt.of(appendLingerMs); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 491df993e0999..60747d811f7fa 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -37,6 +37,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -218,7 +219,7 @@ public void testConfigs() { assertEquals(Duration.ofMinutes(24 * 60 * 60 * 1000L).toMillis(), config.offsetsRetentionMs()); assertEquals(5000, config.offsetCommitTimeoutMs()); assertEquals(CompressionType.GZIP, config.offsetTopicCompressionType()); - assertEquals(10, config.appendLingerMs()); + assertEquals(OptionalInt.of(10), config.appendLingerMs()); assertEquals(555, config.offsetsLoadBufferSize()); assertEquals(111, config.offsetsTopicPartitions()); assertEquals(11, config.offsetsTopicReplicationFactor()); @@ -325,6 +326,18 @@ public void testInvalidConfigs() { assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); } + @Test + public void testAppendLingerMs() { + GroupCoordinatorConfig config = createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, -1)); + assertEquals(OptionalInt.empty(), config.appendLingerMs()); + + config = createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 0)); + assertEquals(OptionalInt.of(0), config.appendLingerMs()); + + config = createConfig(Map.of(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, 5)); + assertEquals(OptionalInt.of(5), config.appendLingerMs()); + } + public static GroupCoordinatorConfig createGroupCoordinatorConfig( int offsetMetadataMaxSize, long offsetsRetentionCheckIntervalMs, diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java index b92947af9cd60..98e0a2a5f25c4 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils; import java.util.Optional; +import java.util.OptionalInt; import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; @@ -69,8 +70,9 @@ public class ShareCoordinatorConfig { public static final String STATE_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the share-group state topic."; public static final String APPEND_LINGER_MS_CONFIG = "share.coordinator.append.linger.ms"; - public static final int APPEND_LINGER_MS_DEFAULT = 5; - public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk."; + public static final int APPEND_LINGER_MS_DEFAULT = -1; + public static final String APPEND_LINGER_MS_DOC = "The duration in milliseconds that the share coordinator will wait for writes to accumulate before flushing them to disk. " + + "Set to -1 for an adaptive linger time that minimizes latency based on the workload."; public static final String STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG = "share.coordinator.state.topic.prune.interval.ms"; public static final int STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT = 5 * 60 * 1000; // 5 minutes @@ -89,7 +91,7 @@ public class ShareCoordinatorConfig { .define(SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_CONFIG, INT, SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DEFAULT, atLeast(0), MEDIUM, SNAPSHOT_UPDATE_RECORDS_PER_SNAPSHOT_DOC) .define(LOAD_BUFFER_SIZE_CONFIG, INT, LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, LOAD_BUFFER_SIZE_DOC) .define(STATE_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) STATE_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, STATE_TOPIC_COMPRESSION_CODEC_DOC) - .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, APPEND_LINGER_MS_DOC) + .define(APPEND_LINGER_MS_CONFIG, INT, APPEND_LINGER_MS_DEFAULT, atLeast(-1), MEDIUM, APPEND_LINGER_MS_DOC) .define(WRITE_TIMEOUT_MS_CONFIG, INT, WRITE_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, WRITE_TIMEOUT_MS_DOC) .defineInternal(STATE_TOPIC_PRUNE_INTERVAL_MS_CONFIG, INT, STATE_TOPIC_PRUNE_INTERVAL_MS_DEFAULT, atLeast(1), LOW, STATE_TOPIC_PRUNE_INTERVAL_MS_DOC) .defineInternal(COLD_PARTITION_SNAPSHOT_INTERVAL_MS_CONFIG, INT, COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DEFAULT, atLeast(1), LOW, COLD_PARTITION_SNAPSHOT_INTERVAL_MS_DOC); @@ -157,8 +159,16 @@ public int shareCoordinatorLoadBufferSize() { return loadBufferSize; } - public int shareCoordinatorAppendLingerMs() { - return appendLingerMs; + /** + * The duration in milliseconds that the coordinator will wait for writes to + * accumulate before flushing them to disk. {@code OptionalInt.empty()} indicates + * an adaptive linger time based on the workload. + */ + public OptionalInt shareCoordinatorAppendLingerMs() { + if (appendLingerMs == -1) { + return OptionalInt.empty(); + } + return OptionalInt.of(appendLingerMs); } public CompressionType shareCoordinatorStateTopicCompressionType() { diff --git a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java new file mode 100644 index 0000000000000..86ae8c1101a37 --- /dev/null +++ b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorConfigTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.config.AbstractConfig; + +import org.junit.jupiter.api.Test; + +import java.util.Map; +import java.util.OptionalInt; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class ShareCoordinatorConfigTest { + + @Test + public void testAppendLingerMs() { + ShareCoordinatorConfig config = createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, -1)); + assertEquals(OptionalInt.empty(), config.shareCoordinatorAppendLingerMs()); + + config = createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, 0)); + assertEquals(OptionalInt.of(0), config.shareCoordinatorAppendLingerMs()); + + config = createConfig(Map.of(ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG, 5)); + assertEquals(OptionalInt.of(5), config.shareCoordinatorAppendLingerMs()); + } + + public static ShareCoordinatorConfig createConfig(Map configs) { + return new ShareCoordinatorConfig(new AbstractConfig( + ShareCoordinatorConfig.CONFIG_DEF, + configs, + false + )); + } +}