|
30 | 30 |
|
31 | 31 | import static org.apache.kafka.common.utils.Utils.mkEntry;
|
32 | 32 | import static org.apache.kafka.common.utils.Utils.mkMap;
|
| 33 | +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT; |
33 | 34 |
|
34 | 35 | public class ListOffsetsTest extends TieredStorageTestHarness {
|
35 | 36 | @Override
|
@@ -94,7 +95,17 @@ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
|
94 | 95 | .expectListOffsets(broker1, topicA, p0, OffsetSpec.forTimestamp(timestamp + 3), new EpochEntry(Integer.MAX_VALUE, 3))
|
95 | 96 |
|
96 | 97 | // delete some records and check whether the earliest_offset gets updated.
|
| 98 | + .expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1) |
97 | 99 | .deleteRecords(topicA, p0, 3L)
|
98 |
| - .expectListOffsets(broker1, topicA, p0, OffsetSpec.earliest(), new EpochEntry(Integer.MAX_VALUE, 3)); |
| 100 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.earliest(), new EpochEntry(Integer.MAX_VALUE, 3)) |
| 101 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.earliestLocalSpec(), new EpochEntry(Integer.MAX_VALUE, 4)) |
| 102 | + |
| 103 | + // delete all the records in remote layer and expect that earliest and earliest_local offsets are same |
| 104 | + .expectDeletionInRemoteStorage(broker1, topicA, p0, DELETE_SEGMENT, 1) |
| 105 | + .deleteRecords(topicA, p0, 4L) |
| 106 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.earliest(), new EpochEntry(Integer.MAX_VALUE, 4)) |
| 107 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.earliestLocalSpec(), new EpochEntry(Integer.MAX_VALUE, 4)) |
| 108 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.latestTierSpec(), new EpochEntry(-1, 3)) |
| 109 | + .expectListOffsets(broker1, topicA, p0, OffsetSpec.latest(), new EpochEntry(Integer.MAX_VALUE, 6)); |
99 | 110 | }
|
100 | 111 | }
|
0 commit comments