Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,43 +102,37 @@ public static Iterable<Snapshot> ancestorsOf(long snapshotId, Function<Long, Sna
}

/**
* Traverses the history of the table's current snapshot and:
* 1. returns null, if no snapshot exists or target timestamp is more recent than the current snapshot.
* 2. else return the first snapshot which satisfies {@literal >=} targetTimestamp.
* <p>
* Given the snapshots (with timestamp): [S1 (10), S2 (11), S3 (12), S4 (14)]
* <p>
* firstSnapshotAfterTimestamp(table, x {@literal <=} 10) = S1
* firstSnapshotAfterTimestamp(table, 11) = S2
* firstSnapshotAfterTimestamp(table, 13) = S4
* firstSnapshotAfterTimestamp(table, 14) = S4
* firstSnapshotAfterTimestamp(table, x {@literal >} 14) = null
* <p>
* where x is the target timestamp in milliseconds and Si is the snapshot
* Traverses the history of the table's current snapshot and finds the first snapshot committed after the given time.
*
* @param table a table
* @param targetTimestampMillis a timestamp in milliseconds
* @return the first snapshot which satisfies {@literal >=} targetTimestamp, or null if the current snapshot is
* more recent than the target timestamp
* @param timestampMillis a timestamp in milliseconds
* @return the first snapshot after the given timestamp, or null if the current snapshot is older than the timestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add the word "committed" again here and before all the "afters". I don't know why I can't handle both concepts in my head at the same time, but I keep visualizing a linked list fo snapshots.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the removal of ">=" from this and other comments, is it intended?

If so, it doens't seem to match the code below which still seems to return ">="

else if (snapshot.timestampMillis() == timestampMillis() 
   return snapshot;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the previous Javadoc with the original one that I wrote. I can be more clear here if needed.

Copy link
Member

@szehon-ho szehon-ho Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it that it's a new method rather than change. To me the previous javadoc is more clear, as "older" gives the impression of strictly greater than.

* @throws IllegalStateException if the first ancestor after the given time can't be determined
*/
public static Snapshot firstSnapshotAfterTimestamp(Table table, Long targetTimestampMillis) {
Snapshot currentSnapshot = table.currentSnapshot();
// Return null if no snapshot exists or target timestamp is more recent than the current snapshot
if (currentSnapshot == null || currentSnapshot.timestampMillis() < targetTimestampMillis) {
public static Snapshot oldestAncestorAfter(Table table, long timestampMillis) {
if (table.currentSnapshot() == null) {
// there are no snapshots or ancestors
return null;
}

// Return the oldest snapshot which satisfies >= targetTimestamp
Snapshot lastSnapshot = null;
for (Snapshot snapshot : currentAncestors(table)) {
if (snapshot.timestampMillis() < targetTimestampMillis) {
if (snapshot.timestampMillis() < timestampMillis) {
return lastSnapshot;
} else if (snapshot.timestampMillis() == timestampMillis) {
return snapshot;
Copy link
Member

@szehon-ho szehon-ho Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but curious what case does this new condition fix?

Even in the old code, seems like it would return a snapshot which satisfies timestamp >= snapshot.timestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't changing the method as much as completely replacing the old code with a different implementation that I suggested originally. I didn't want to go through and figure out what had changed and why, I just wanted to make it work.

The clause here catches the case where the current snapshot is the one to return because its timestamp matches the requested timestamp. In that case, there's no need to have an earlier parent so we short-circuit early.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for context that it's a new method rather than changing the existing one.

The previous code is slightly easier (one less case for user to understand, and in my understanding it still works but just takes one more cycle), but not a big deal as the new case is straight forward.

If it was me, I'd prefer the clarity of Stream methods, but I guess we do mostly manual traversals in Iceberg due to performance.

currentAncestors(table).toStream().filter(Snapshot::timestampMillis() <=  timestampMillis).findFirst()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we're also more carefully traversing the ancestors. We aren't finding the snapshot before timestampMillis, we're finding the child of the first snapshot before timestamp millis. And if the parent doesn't exist, it throws an exception.

}

lastSnapshot = snapshot;
}

// Return the oldest snapshot if the target timestamp is less than the oldest snapshot of the table
return lastSnapshot;
if (lastSnapshot != null && lastSnapshot.parentId() == null) {
// this is the first snapshot in the table, return it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried about having a function which works for a given input but only until the starting snapshot is expired. For example

oldestAncestorAfter(table,  Long.MinValue) // Returns first snapshot
expireSnapshots() // Expire first snapshot
oldestAncestorAfter(table,  Long.MinValue) // Throws exception

I think if we want to standardize this should probably also throw an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point here, but the result is based on the table state that gets passed in. If the table state is missing information, then we can't make it consistent.

Here's another way to think about it:

t1 = commitSnapshotOne()
t2 = commitSnapshotTwo()
oldestAncestorAfter(table, Long.MinValue) // returns snapshot one
expireSnapshots(t2 - 1)
oldestAncestorAfter(table, Long.MinValue) // returns snapshot two

I think that the behavior above is worse than throwing an exception based on the table state because it is silently inconsisent. At least throwing an exception tells you why it isn't returning the expected value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should change it, I just think we can just always throw the exception so

oldestAncestorAfter(table,  Long.MinValue) // Throw exception
expireSnapshots() // Expire first snapshot
oldestAncestorAfter(table,  Long.MinValue) // Throw exception

return lastSnapshot;
}

throw new IllegalStateException(
"Cannot find snapshot older than " + DateTimeUtil.formatTimestampMillis(timestampMillis));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream {
@Override
public Offset latestOffset() {
table.refresh();
if (isStreamEmpty(table)) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

if (isFutureStartTime(table, fromTimestamp)) {
return initialFutureStartOffset(table);
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

Snapshot latestSnapshot = table.currentSnapshot();
Expand Down Expand Up @@ -169,8 +169,7 @@ public void stop() {
private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) :
startOffset;
determineStartingOffset(table, fromTimestamp) : startOffset;
Copy link
Member

@RussellSpitzer RussellSpitzer Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be simpler if we just drop the ternary operator here and have "suppliedOffset" be a parameter like

private Offset determineStartingOffset(table, fromTimestamp, startOffset) {
  if (startOffset != null && !StreamingOffset.equals(startOffset)){
    return startOffset;
  }
... logic
}

private Offset determineStartingOffset(table, fromTimetsamp) {
   determineStartingOffset(table, fromTimetsamp, null)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the problem is that we have 3 "start" offsets here:

  1. StreamingOffset.START_OFFSET is a fake offset for when we don't have enough information, like when the table has no snapshots or the timestamp is in the future
  2. The "starting" offset referred to by determineStartingOffset is the concrete replacement for START_OFFSET, if the there is enough information to start; otherwise it returns START_OFFSET as a placeholder again
  3. startOffset is the known starting point for this batch, from the last batch's end offset or initialization

I don't think that it makes sense to mix startOffset into the method because they're different things and determineStartingOffset is already fairly complicated. We could have a separate method, determineBatchStartOffset instead, but that would basically be this expression.

A better fix for this confusion is probably to rename the method. Maybe determineInitialOffset is more clear?


StreamingOffset currentOffset = null;

Expand Down Expand Up @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) {
return op.equals(DataOperations.APPEND);
}

private static boolean isStreamEmpty(Table table) {
return table.currentSnapshot() == null;
}

private static boolean isStreamNotEmpty(Table table) {
return table.currentSnapshot() != null;
}
private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) {
if (streamStartTimeStampMillis == null) {
return false;
if (fromTimestamp == null) {
// match existing behavior and start from the oldest snapshot
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}

return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis;
}
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

private static StreamingOffset initialFutureStartOffset(Table table) {
Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input.");
Snapshot latestSnapshot = table.currentSnapshot();
return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false);
try {
Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp);
if (snapshot != null) {
return new StreamingOffset(snapshot.snapshotId(), 0, false);
} else {
return StreamingOffset.START_OFFSET;
}
} catch (IllegalStateException e) {
// could not determine the first snapshot after the timestamp. use the oldest ancestor instead
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}
}

private static class InitialOffsetStore {
Expand All @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() {
}

table.refresh();
StreamingOffset offset = StreamingOffset.START_OFFSET;
if (isStreamNotEmpty(table)) {
offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) :
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false);
}
StreamingOffset offset = determineStartingOffset(table, fromTimestamp);

OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream {
@Override
public Offset latestOffset() {
table.refresh();
if (isStreamEmpty(table)) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

if (isFutureStartTime(table, fromTimestamp)) {
return initialFutureStartOffset(table);
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

Snapshot latestSnapshot = table.currentSnapshot();
Expand Down Expand Up @@ -169,8 +169,7 @@ public void stop() {
private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) :
startOffset;
determineStartingOffset(table, fromTimestamp) : startOffset;

StreamingOffset currentOffset = null;

Expand Down Expand Up @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) {
return op.equals(DataOperations.APPEND);
}

private static boolean isStreamEmpty(Table table) {
return table.currentSnapshot() == null;
}

private static boolean isStreamNotEmpty(Table table) {
return table.currentSnapshot() != null;
}
private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) {
if (streamStartTimeStampMillis == null) {
return false;
if (fromTimestamp == null) {
// match existing behavior and start from the oldest snapshot
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}

return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis;
}
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

private static StreamingOffset initialFutureStartOffset(Table table) {
Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input.");
Snapshot latestSnapshot = table.currentSnapshot();
return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false);
try {
Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp);
if (snapshot != null) {
return new StreamingOffset(snapshot.snapshotId(), 0, false);
} else {
return StreamingOffset.START_OFFSET;
}
} catch (IllegalStateException e) {
// could not determine the first snapshot after the timestamp. use the oldest ancestor instead
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}
}

private static class InitialOffsetStore {
Expand All @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() {
}

table.refresh();
StreamingOffset offset = StreamingOffset.START_OFFSET;
if (isStreamNotEmpty(table)) {
offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) :
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false);
}
StreamingOffset offset = determineStartingOffset(table, fromTimestamp);

OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ public class SparkMicroBatchStream implements MicroBatchStream {
@Override
public Offset latestOffset() {
table.refresh();
if (isStreamEmpty(table)) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

if (isFutureStartTime(table, fromTimestamp)) {
return initialFutureStartOffset(table);
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

Snapshot latestSnapshot = table.currentSnapshot();
Expand Down Expand Up @@ -169,8 +169,7 @@ public void stop() {
private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) :
startOffset;
determineStartingOffset(table, fromTimestamp) : startOffset;

StreamingOffset currentOffset = null;

Expand Down Expand Up @@ -208,26 +207,31 @@ private boolean shouldProcess(Snapshot snapshot) {
return op.equals(DataOperations.APPEND);
}

private static boolean isStreamEmpty(Table table) {
return table.currentSnapshot() == null;
}

private static boolean isStreamNotEmpty(Table table) {
return table.currentSnapshot() != null;
}
private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
if (table.currentSnapshot() == null) {
return StreamingOffset.START_OFFSET;
}

private static boolean isFutureStartTime(Table table, Long streamStartTimeStampMillis) {
if (streamStartTimeStampMillis == null) {
return false;
if (fromTimestamp == null) {
// match existing behavior and start from the oldest snapshot
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}

return table.currentSnapshot().timestampMillis() < streamStartTimeStampMillis;
}
if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
return StreamingOffset.START_OFFSET;
}

private static StreamingOffset initialFutureStartOffset(Table table) {
Preconditions.checkNotNull(table, "Cannot process future start offset with invalid table input.");
Snapshot latestSnapshot = table.currentSnapshot();
return new StreamingOffset(latestSnapshot.snapshotId(), Iterables.size(latestSnapshot.addedFiles()) + 1, false);
try {
Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, fromTimestamp);
if (snapshot != null) {
return new StreamingOffset(snapshot.snapshotId(), 0, false);
} else {
return StreamingOffset.START_OFFSET;
}
} catch (IllegalStateException e) {
// could not determine the first snapshot after the timestamp. use the oldest ancestor instead
return new StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
}
}

private static class InitialOffsetStore {
Expand All @@ -250,11 +254,7 @@ public StreamingOffset initialOffset() {
}

table.refresh();
StreamingOffset offset = StreamingOffset.START_OFFSET;
if (isStreamNotEmpty(table)) {
offset = isFutureStartTime(table, fromTimestamp) ? initialFutureStartOffset(table) :
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false);
}
StreamingOffset offset = determineStartingOffset(table, fromTimestamp);

OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
Expand Down
Loading