Skip to content

Conversation

@rdblue
Copy link
Contributor

@rdblue rdblue commented Dec 20, 2021

The util class SnapshotUtil is shared, so it needs to have fairly strict method contracts. This replaces SnapshotUtil.firstSnapshotAfterTimestamp with oldestAncestorAfter:

  • The new implementation throws IllegalStateException if the correct ancestor cannot be determined
  • The new name is clear that the snapshots considered are ancestors, not all snapshots

This also updates places that called firstSnapshotAfterTimestamp and attempts to have the same behavior by catching the IllegalStateException (cannot determine ancestor) and uses the oldest ancestor instead. However, in updating the Spark streaming code, I noticed a few bugs:

  • planFiles will call .snapshotId() without checking the snapshot, which can be null if the timestamp is newer than the current, resulting in a NullPointerException
  • The initial offset store checks for a future timestamp, but planFiles does not
  • The initial offset store handles null fromTimestamp, but planFiles does not
  • The initialFutureStartOffset creates an offset after the current snapshot, but not necessarily after the given future time

I'm also attempting to fix those issues. This simplifies the code by removing several static helper methods. These assisted readability, but made assumptions about whether the table has a current snapshot and so were prone to NullPointerExceptions. Instead, this adds determineStartingOffset that is responsible for getting a starting offset or returning StreamingOffset.START_OFFSET if it cannot be determined because of the table state or a timestamp in the future.

Now, StreamingOffset.START_OFFSET means that the job cannot start because the start offset has not been determined, and there is no "initial future offset".

// Return the oldest snapshot if the target timestamp is less than the oldest snapshot of the table
return lastSnapshot;
if (lastSnapshot != null && lastSnapshot.parentId() == null) {
// this is the first snapshot in the table, return it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried about having a function which works for a given input but only until the starting snapshot is expired. For example

oldestAncestorAfter(table,  Long.MinValue) // Returns first snapshot
expireSnapshots() // Expire first snapshot
oldestAncestorAfter(table,  Long.MinValue) // Throws exception

I think if we want to standardize this should probably also throw an exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point here, but the result is based on the table state that gets passed in. If the table state is missing information, then we can't make it consistent.

Here's another way to think about it:

t1 = commitSnapshotOne()
t2 = commitSnapshotTwo()
oldestAncestorAfter(table, Long.MinValue) // returns snapshot one
expireSnapshots(t2 - 1)
oldestAncestorAfter(table, Long.MinValue) // returns snapshot two

I think that the behavior above is worse than throwing an exception based on the table state because it is silently inconsisent. At least throwing an exception tells you why it isn't returning the expected value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should change it, I just think we can just always throw the exception so

oldestAncestorAfter(table,  Long.MinValue) // Throw exception
expireSnapshots() // Expire first snapshot
oldestAncestorAfter(table,  Long.MinValue) // Throw exception

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this changes although I think we should have some better names and avoid using "after" and "before" when talking about traversals and time in the same docs.

I think we should also definitely add in the tests that would have exposed the NPE's and other issues to avoid any future issues.

* @return the first snapshot which satisfies {@literal >=} targetTimestamp, or null if the current snapshot is
* more recent than the target timestamp
* @param timestampMillis a timestamp in milliseconds
* @return the first snapshot after the given timestamp, or null if the current snapshot is older than the timestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add the word "committed" again here and before all the "afters". I don't know why I can't handle both concepts in my head at the same time, but I keep visualizing a linked list fo snapshots.

StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.firstSnapshotAfterTimestamp(table, fromTimestamp).snapshotId(), 0, false) :
startOffset;
determineStartingOffset(table, fromTimestamp) : startOffset;
Copy link
Member

@RussellSpitzer RussellSpitzer Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be simpler if we just drop the ternary operator here and have "suppliedOffset" be a parameter like

private Offset determineStartingOffset(table, fromTimestamp, startOffset) {
  if (startOffset != null && !StreamingOffset.equals(startOffset)){
    return startOffset;
  }
... logic
}

private Offset determineStartingOffset(table, fromTimetsamp) {
   determineStartingOffset(table, fromTimetsamp, null)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the problem is that we have 3 "start" offsets here:

  1. StreamingOffset.START_OFFSET is a fake offset for when we don't have enough information, like when the table has no snapshots or the timestamp is in the future
  2. The "starting" offset referred to by determineStartingOffset is the concrete replacement for START_OFFSET, if the there is enough information to start; otherwise it returns START_OFFSET as a placeholder again
  3. startOffset is the known starting point for this batch, from the last batch's end offset or initialization

I don't think that it makes sense to mix startOffset into the method because they're different things and determineStartingOffset is already fairly complicated. We could have a separate method, determineBatchStartOffset instead, but that would basically be this expression.

A better fix for this confusion is probably to rename the method. Maybe determineInitialOffset is more clear?

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some questions about the boundary case, sorry if I'm missing the context

* @return the first snapshot which satisfies {@literal >=} targetTimestamp, or null if the current snapshot is
* more recent than the target timestamp
* @param timestampMillis a timestamp in milliseconds
* @return the first snapshot after the given timestamp, or null if the current snapshot is older than the timestamp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the removal of ">=" from this and other comments, is it intended?

If so, it doens't seem to match the code below which still seems to return ">="

else if (snapshot.timestampMillis() == timestampMillis() 
   return snapshot;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced the previous Javadoc with the original one that I wrote. I can be more clear here if needed.

Copy link
Member

@szehon-ho szehon-ho Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it that it's a new method rather than change. To me the previous javadoc is more clear, as "older" gives the impression of strictly greater than.

if (snapshot.timestampMillis() < timestampMillis) {
return lastSnapshot;
} else if (snapshot.timestampMillis() == timestampMillis) {
return snapshot;
Copy link
Member

@szehon-ho szehon-ho Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but curious what case does this new condition fix?

Even in the old code, seems like it would return a snapshot which satisfies timestamp >= snapshot.timestamp?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't changing the method as much as completely replacing the old code with a different implementation that I suggested originally. I didn't want to go through and figure out what had changed and why, I just wanted to make it work.

The clause here catches the case where the current snapshot is the one to return because its timestamp matches the requested timestamp. In that case, there's no need to have an earlier parent so we short-circuit early.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for context that it's a new method rather than changing the existing one.

The previous code is slightly easier (one less case for user to understand, and in my understanding it still works but just takes one more cycle), but not a big deal as the new case is straight forward.

If it was me, I'd prefer the clarity of Stream methods, but I guess we do mostly manual traversals in Iceberg due to performance.

currentAncestors(table).toStream().filter(Snapshot::timestampMillis() <=  timestampMillis).findFirst()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we're also more carefully traversing the ancestors. We aren't finding the snapshot before timestampMillis, we're finding the child of the first snapshot before timestamp millis. And if the parent doesn't exist, it throws an exception.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR would affect this added documentation PR, I believe: #3732

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me! I do think this ends up being a lot more readable

@rdblue rdblue merged commit 5009949 into apache:master Dec 21, 2021
@rdblue
Copy link
Contributor Author

rdblue commented Dec 21, 2021

Thanks, @RussellSpitzer! It's looking a lot better now.

@RussellSpitzer
Copy link
Member

@me Reminder to back port test changes

RussellSpitzer added a commit that referenced this pull request Jan 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants