Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,26 @@ public long sizeInBytes() {
return sizeInBytesByMinGen(-1);
}

long earliestLastModifiedAge() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return findEarliestLastModifiedAge(System.currentTimeMillis(), readers, current);
} catch (IOException e) {
throw new TranslogException(shardId, "Unable to get the earliest last modified time for the transaction log");
}
}

/**
* Returns the age of the oldest entry in the translog files in seconds
*/
static long findEarliestLastModifiedAge(long currentTime, Iterable<TranslogReader> readers, TranslogWriter writer) throws IOException {
long earliestTime = currentTime;
for (BaseTranslogReader r : readers) {
earliestTime = Math.min(r.getLastModifiedTime(), earliestTime);
}
return currentTime - Math.min(earliestTime, writer.getLastModifiedTime());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Math.max(0, currentTime - Math.min()) ? we rely on this being non negative, but time may go back and the FS may have other quirks.

}

/**
* Returns the number of operations in the transaction files that aren't committed to lucene..
*/
Expand Down Expand Up @@ -751,7 +771,7 @@ private void closeOnTragicEvent(Exception ex) {
public TranslogStats stats() {
// acquire lock to make the two numbers roughly consistent (no file change half way)
try (ReleasableLock lock = readLock.acquire()) {
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes());
return new TranslogStats(totalOperations(), sizeInBytes(), uncommittedOperations(), uncommittedSizeInBytes(), earliestLastModifiedAge());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ public class TranslogStats implements Streamable, ToXContentFragment {
private int numberOfOperations;
private long uncommittedSizeInBytes;
private int uncommittedOperations;
private long earliestLastModifiedAge;

public TranslogStats() {
}

public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncommittedOperations, long uncommittedSizeInBytes) {
public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncommittedOperations, long uncommittedSizeInBytes,
long earliestLastModifiedAge) {
if (numberOfOperations < 0) {
throw new IllegalArgumentException("numberOfOperations must be >= 0");
}
Expand All @@ -51,10 +53,14 @@ public TranslogStats(int numberOfOperations, long translogSizeInBytes, int uncom
if (uncommittedSizeInBytes < 0) {
throw new IllegalArgumentException("uncommittedSizeInBytes must be >= 0");
}
if (earliestLastModifiedAge < 0) {
throw new IllegalArgumentException("earliestLastModifiedAge must be >= 0");
}
this.numberOfOperations = numberOfOperations;
this.translogSizeInBytes = translogSizeInBytes;
this.uncommittedSizeInBytes = uncommittedSizeInBytes;
this.uncommittedOperations = uncommittedOperations;
this.earliestLastModifiedAge = earliestLastModifiedAge;
}

public void add(TranslogStats translogStats) {
Expand All @@ -66,6 +72,8 @@ public void add(TranslogStats translogStats) {
this.translogSizeInBytes += translogStats.translogSizeInBytes;
this.uncommittedOperations += translogStats.uncommittedOperations;
this.uncommittedSizeInBytes += translogStats.uncommittedSizeInBytes;
this.earliestLastModifiedAge =
Math.min(this.earliestLastModifiedAge, translogStats.earliestLastModifiedAge);
}

public long getTranslogSizeInBytes() {
Expand All @@ -86,13 +94,16 @@ public int getUncommittedOperations() {
return uncommittedOperations;
}

public long getEarliestLastModifiedAge() { return earliestLastModifiedAge; }

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("translog");
builder.field("operations", numberOfOperations);
builder.byteSizeField("size_in_bytes", "size", translogSizeInBytes);
builder.field("uncommitted_operations", uncommittedOperations);
builder.byteSizeField("uncommitted_size_in_bytes", "uncommitted_size", uncommittedSizeInBytes);
builder.field("earliest_last_modified_age", earliestLastModifiedAge);
builder.endObject();
return builder;
}
Expand All @@ -113,6 +124,9 @@ public void readFrom(StreamInput in) throws IOException {
uncommittedOperations = numberOfOperations;
uncommittedSizeInBytes = translogSizeInBytes;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
earliestLastModifiedAge = in.readVLong();
}
}

@Override
Expand All @@ -123,5 +137,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(uncommittedOperations);
out.writeVLong(uncommittedSizeInBytes);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeVLong(earliestLastModifiedAge);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.stub;

@LuceneTestCase.SuppressFileSystems("ExtrasFS")
public class TranslogTests extends ESTestCase {
Expand Down Expand Up @@ -356,6 +358,25 @@ protected TranslogStats stats() throws IOException {
return stats;
}

public void testFindEarliestLastModifiedAge() throws IOException {
long fixedTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be good to use randomized numbers here. It will be easier to reproduce failures and it will also test time go back.

long[] periods = new long[10];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we sometime test the case where have no readers?

for (int i = 0; i < 9; i++) {
periods[i] = randomLongBetween(10000, 1000000);
}
List<TranslogReader> readers = new ArrayList<>();
for (long l : periods) {
TranslogReader r = mock(TranslogReader.class);
stub(r.getLastModifiedTime()).toReturn(fixedTime - l);
readers.add(r);
}
long period = randomLongBetween(10000, 1000000);
periods[9] = period;
TranslogWriter w = mock(TranslogWriter.class);
stub(w.getLastModifiedTime()).toReturn(fixedTime - period);
assertThat(Translog.findEarliestLastModifiedAge(fixedTime, readers, w), equalTo(LongStream.of(periods).max().orElse(0L)));
}

public void testStats() throws IOException {
// self control cleaning for test
translog.getDeletionPolicy().setRetentionSizeInBytes(1024 * 1024);
Expand All @@ -374,6 +395,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(97L));
assertThat(stats.getUncommittedOperations(), equalTo(1));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(97L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}

translog.add(new Translog.Delete("test", "2", 1, newUid("2")));
Expand All @@ -383,6 +405,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(146L));
assertThat(stats.getUncommittedOperations(), equalTo(2));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(146L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}

translog.add(new Translog.Delete("test", "3", 2, newUid("3")));
Expand All @@ -392,6 +415,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(195L));
assertThat(stats.getUncommittedOperations(), equalTo(3));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(195L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}

translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16)));
Expand All @@ -401,6 +425,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(237L));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(237L));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}

final long expectedSizeInBytes = 280L;
Expand All @@ -411,6 +436,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getUncommittedOperations(), equalTo(4));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}

{
Expand All @@ -428,7 +454,8 @@ public void testStats() throws IOException {
copy.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertThat(builder.string(), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + "}}"));
+ ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes
+ ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + "}}"));
}
}

Expand All @@ -439,6 +466,7 @@ public void testStats() throws IOException {
assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes));
assertThat(stats.getUncommittedOperations(), equalTo(0));
assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition));
assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L));
}
}

Expand Down Expand Up @@ -468,12 +496,12 @@ public void testUncommittedOperations() throws Exception {
}

public void testTotalTests() {
final TranslogStats total = new TranslogStats();
final TranslogStats total = new TranslogStats(0, 0, 0, 0, 1);
final int n = randomIntBetween(0, 16);
final List<TranslogStats> statsList = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
final TranslogStats stats = new TranslogStats(randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20),
randomIntBetween(1, 1 << 20), randomIntBetween(1, 4096));
randomIntBetween(1, 1 << 20), randomIntBetween(1, 4096), randomIntBetween(1, 1 << 20));
statsList.add(stats);
total.add(stats);
}
Expand All @@ -490,22 +518,30 @@ public void testTotalTests() {
assertThat(
total.getUncommittedSizeInBytes(),
equalTo(statsList.stream().mapToLong(TranslogStats::getUncommittedSizeInBytes).sum()));
assertThat(
total.getEarliestLastModifiedAge(),
equalTo(1L));
}

public void testNegativeNumberOfOperations() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1, 1, 1));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(-1, 1, 1, 1, 1));
assertThat(e, hasToString(containsString("numberOfOperations must be >= 0")));
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, -1, 1));
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, -1, 1, 1));
assertThat(e, hasToString(containsString("uncommittedOperations must be >= 0")));
}

public void testNegativeSizeInBytes() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, -1, 1, 1, 1));
assertThat(e, hasToString(containsString("translogSizeInBytes must be >= 0")));
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, -1));
e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, -1, 1));
assertThat(e, hasToString(containsString("uncommittedSizeInBytes must be >= 0")));
}

public void testOldestEntryInSeconds() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new TranslogStats(1, 1, 1, 1, -1));
assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0")));
}

public void testSnapshot() throws IOException {
ArrayList<Translog.Operation> ops = new ArrayList<>();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
Expand Down