-
Notifications
You must be signed in to change notification settings - Fork 3k
Support Spark Structured Streaming Read for Iceberg #796
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for working on this, @jerryshao! I'll take a look. |
spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
| long toSnapshotId = snapshotIds.get(i - 1); | ||
| CloseableIterable<CombinedScanTask> iter = buildTableScan() | ||
| .appendsBetween(fromSnapshotId, toSnapshotId) | ||
| .planTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally, planTasks calls planFiles, which uses a worker thread-pool to scan manifests and add matching files to a concurrent queue. When using parallel planning, the order of files is non-deterministic and so are the tasks that end up getting created because the order influences how files are combined into tasks.
It looks like this currently relies on deterministic behavior in planTasks. That's because getChanges is called more than once -- both while calculating the end offset and planning tasks for a batch. Even if we were to add a cache to avoid the cost of scan planning more than once, this would still be a problem because tasks could be different when recovering from failure if an index is in the middle of a task list for a snapshot.
I think we need to guarantee deterministic behavior for tasks. I think there are two options:
- Disable parallel scan planning by adding an option to
TableScan - Call
planFilesinstead ofplanTasks, but sort the files to produce a deterministic order. Then, split and combine files into tasks.
I prefer option 2 since it still scans table metadata in parallel. We would just need to move the splitting and combining code from BaseTableScan into utility methods.
That should address correctness, but I think we also want to add a cache for the files or splits in each snapshot. We don't want to scan snapshot metadata files more than once if we can avoid it because it is expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out the key issue. This part is also the most unsure part when I achieve this. I will think a bit on this and change the current way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, but I think we also want to add a cache for the files or splits in each snapshot
@rdblue Can I separate this thing into another PR? I was thinking that this is not super related to streaming reader here. The snapshot traversing in incremental scan is super slow when snapshot reaches more than 1k, this could be a general improvement, maybe I can use a new PR to address this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree it should be in a separate PR. No need to over-complicate this one.
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
|
@jerryshao, thanks for building this! Overall it looks really good. The only major problem is that |
|
Thanks a lot @rdblue for your comments. let me go through all you comments and refactor the current implementation. |
| * A scan task over a range of a single file. | ||
| */ | ||
| public interface FileScanTask extends ScanTask { | ||
| public interface FileScanTask extends ScanTask, Comparable<FileScanTask> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this change will break the api compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't break compatibility, but I'd prefer not to add it to the interface. First, it isn't necessary to use this because we can use a separate Comparator<FileScanTask> that can be a private implementation. That makes more sense to me because this is an arbitrary ordering of tasks.
Second, while I can't really think of a case where someone would do something strange, this does make it possible. For example, if an implementation chose to override this with return 0 then the source would have non-deterministic behavior. If we can avoid a larger API by using Comparator then I think that's a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I agree, let me change to use Comparator.
|
@rdblue would you please take another look at this patch at your convenience. |
| public MicroBatchReader createMicroBatchReader(Optional<StructType> schema, String checkpointLocation, | ||
| DataSourceOptions options) { | ||
| if (schema.isPresent()) { | ||
| throw new IllegalStateException("Iceberg does not support specifying the schema at read time"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine for now, but we do support it on the batch read path: https://github.com/apache/incubator-iceberg/blob/master/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java#L87-L91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let me see how it works.
| this.initialOffset = calculateInitialOffset(); | ||
| } | ||
|
|
||
| this.splitSize = Optional.ofNullable(table.properties().get(TableProperties.SPLIT_SIZE)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the batch reader, options can override these values. Should that be supported here as well?
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
| } else { | ||
| LOG.warn("The option snapshot-id {} is not a valid snapshot id, will use the earliest " + | ||
| "snapshot instead", snapshotId); | ||
| this.initialOffset = calculateInitialOffset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be an error instead of using the initial offset. The user has expressed a snapshot ID to process from and using the initial offset would not return the rows that the user requested. I think not being able to satisfy the user's query should be an exception instead of a warning. Users don't look at warnings, so this will cause silently incorrect results.
In addition, the initial offset is a start offset, so all existing data is processed. That could easily cause a failure because this will try to load all data files into memory at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the Kafka, if the specified offset is not existed, Kafka will automatically change to use earliest or latest offset based on the configuration. I was thinking to achieve the same functionality here. I can change to throw an exception.
In addition, the initial offset is a start offset, so all existing data is processed. That could easily cause a failure because this will try to load all data files into memory at the same time.
I'm not sure why this will cause an issue to load all data files into memory. If we specify a starting offset, then current implementation will retrieve the complete data from this snapshot plus all the following snapshots' appended files. This has no big difference compared to travel from very beginning of the table. Would you please help to explain more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case I'm thinking about is when is_starting is true, so the source will try to read all existing data in the table for the first known snapshot. That will cause getChanges to build a scan of all data files, which is then sorted and collected into a list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To solve this problem, what about changing how we get a reliable ordering for the data files? Because each snapshot is immutable, the data files for a snapshot are always stored across the same set of manifest files. Instead of sorting data files, you could sort the manifest files in a snapshot. Then you'd be able to use ManifestGroup to build an iterable of FileScanTask in a reliable order, without sorting a large list.
That would decrease planning time as well as memory use because this wouldn't need to load all of the data files at one time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue , I'm not sure if I understand correctly. If we sort manifest files in a snapshot, then can we guarantee that the FileScanTask is stable sorted when generating using parallel thread pool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point now, let me try to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the way of using manifest files requires to change several parts of Iceberg core to expose the logics of plan files, not sure if this is an elegant way.
- How do we control the process per batch, now the semantic changes to "manifests per trigger", there could bunch of data files in one manifest, how do we semantically control the size per batch.
- The major problem problem mentioned above could only happen when reader is far behind writer. If the reader could catch up with writer, then in each batch it will not process two many snapshots, and the plan time should not be long.
So I'm still thinking of this way to use manifest files and expose manifest group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would defer the changes to use manifest group, need to figure out a clean way. Adding a cache layer would solve the planning time issue, but memory usage could be an issue in some cases. Do you think if the current implementation is acceptable? @rdblue
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Outdated
Show resolved
Hide resolved
| StreamingOffset end3 = (StreamingOffset) streamingReader.getEndOffset(); | ||
| validateOffset(end2, end3); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a checkpoint recovery test
| private Boolean readUsingBatch = null; | ||
|
|
||
| // Used to cache the pending batches for this streaming batch interval. | ||
| private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If my stream fails and I have to restore from a checkpoint, will there be an issue that some of my offsets are now computed differently than previously cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because we don't do filter push down and the FileScan should be same.
| } | ||
|
|
||
| @Test | ||
| public void testStreamingReadWithSpecifiedSnapshotId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several other options that can be passed through and we probably need to verify that all of them work, I counted
SPLIT_SIZE
SPLIT_OPEN_FILE_COST
SPLIT_LOOKBACK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these parameters are verified by other tests, and has been tested in Spark batch read test, there's no need to test again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would feel more comfortable if there were tests here as well since we may not always have the same implementation under the hood. It's also safest to make sure that the options can be properly propagated all the way through, just incase the underlying implementation works, but the passing the parameters through is bugged in some way.
| Preconditions.checkArgument(maxSizePerBatch > 0L, | ||
| "Option max-size-per-batch '%s' should > 0", maxSizePerBatch); | ||
|
|
||
| this.startSnapshotId = options.get("starting-snapshot-id").map(Long::parseLong).orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
starting-snapshot-id should have a constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From batch query, the option name is start-snapshot-id. What about using the same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will change it.
| super(table, io, encryptionManager, caseSensitive, options); | ||
|
|
||
| this.table = table; | ||
| this.maxSizePerBatch = options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
max-size-per-batch also needs a constant
|
I have a general question about the micro batch approach, I'm fairly new to this code but I was wondering if it makes sense to have a batch which only contains some of the manifests in a snapshot? Seems like this would be a state of the table that wouldn't actually ever be visible from the normal api? |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just general comments as I'm not quite familiar with the Iceberg codebase. Some part remains (including tests). Once the first pass is done I'll try to go through the detailed reviews, but it should take plenty of time (as I need to learn as well) so it would be nice if PR can be actively reviewed by other experts as well.
| * This method assumes that fromSnapshotId is an ancestor of toSnapshotId | ||
| */ | ||
| public static List<Long> snapshotIdsBetween(Table table, long fromSnapshotId, long toSnapshotId) { | ||
| AtomicBoolean isAncestor = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't just have
if (snapshotId == fromSnapshotId) throw new IllegalStateException?
That would be odd, as the condition is actually "expected" one. Even we throw a custom exception to terminate it, would the snapshots collected in previous call be "available" after handling the exception?
Btw this logic is great to prune the case (say, optimization), though I agree the code is less friendly to understand intuitively. Probably leaving a small code comment would make it clearer.
| * when deserializing from json string. | ||
| * snapshot_id: The snapshot id of StreamingOffset, this is used to record the current processed snapshot of Iceberg | ||
| * table. | ||
| * index: The index of snapshot, this is used to record the processed data file index in this snapshot. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* index: The index of snapshot, this is used to record the processed data file index in this snapshot.
According to the comment below, shouldn't be clearer if we just replace it to this is the index of last scanned file in snapshot? Looks like the sentence in javadoc is unclear.
| try { | ||
| JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); | ||
| int version = JsonUtil.getInt(VERSION, node); | ||
| if (version != CURR_VERSION) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the version is 1 it's OK to enforce the same version, but probably we want to support multiple versions afterwards (valid versions would be version <= CURR_VERSION).
| JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class); | ||
| int version = JsonUtil.getInt(VERSION, node); | ||
| if (version != CURR_VERSION) { | ||
| throw new IOException(String.format("Cannot deserialize a JSON offset from version %d. %d does not match " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather throw IllegalStateException or define a new exception to make clear "the version" matters.
| throw new IllegalStateException("Iceberg does not support specifying the schema at read time"); | ||
| } | ||
|
|
||
| Configuration conf = new Configuration(lazyBaseConf()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There looks to be same patterns across this file, though the PR is already too huge and better to do in other PR.
| this.startSnapshotId = options.get("starting-snapshot-id").map(Long::parseLong).orElse(null); | ||
| if (startSnapshotId != null) { | ||
| if (!SnapshotUtil.ancestorOf(table, table.currentSnapshot().snapshotId(), startSnapshotId)) { | ||
| throw new IllegalStateException("The option starting-snapshot-id " + startSnapshotId + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this is an exception from argument, hence IllegalArgumentException sounds more natural.
| Preconditions.checkArgument(maxSizePerBatch > 0L, | ||
| "Option max-size-per-batch '%s' should > 0", maxSizePerBatch); | ||
|
|
||
| this.startSnapshotId = options.get("starting-snapshot-id").map(Long::parseLong).orElse(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From batch query, the option name is start-snapshot-id. What about using the same name?
| return StreamingOffset.START_OFFSET; | ||
| } | ||
|
|
||
| // Spark will invoke setOffsetRange more than once. If this is already calulated, use the cached one to avoid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: calculated
| } | ||
|
|
||
| Preconditions.checkState(cachedPendingBatches != null, | ||
| "pendingBatches is null, which is unexpected as it will be set when calculating end offset"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is an "unexpected" condition, IMHO, it might be better to provide imperative message - don't explain in detail but report it to the mailing list. The details can be left in code comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to check and fail the unexpected scenario, normally cachedPendingBatches should not be null.
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Show resolved
Hide resolved
| for (Long id : snapshotIds) { | ||
| Snapshot snapshot = table.snapshot(id); | ||
| checkOverwrite(snapshot); | ||
| if (!isAppend(snapshot)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth to mention in PR or doc later on limitations due to the incremental read.
I see two cases: 1. Cannot read from snapshot with overwrite operation. 2. Deleted files from table may be still read afterwards.
Is my understanding correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct, it is the same behavior as incremental scan.
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't looked into the test. Probably go through looking into the test code once the source side is mostly OK.
| long currentLeftSize = maxSize; | ||
| MicroBatch lastBatch = null; | ||
|
|
||
| checkOverwrite(table.snapshot(startOffset.snapshotId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to have longer name for clarify, like assertNoOverwrite.
|
|
||
| checkOverwrite(table.snapshot(startOffset.snapshotId())); | ||
| if (shouldGenerateFromStartOffset(startOffset)) { | ||
| currentLeftSize -= generateMicroBatch(startOffset.snapshotId(), startOffset.index(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we avoid using out parameter if possible? batches is used as out parameter which isn't actually needed. generateMicroBatch can just return a new MicroBatch, and calculating size and adding it to the list can be just done here.
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
Show resolved
Hide resolved
| if (currentSnapshotId == snapshotId) { | ||
| // the snapshot of current offset is already the latest snapshot of this table. | ||
| return batches; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word "between" makes me feel I can retrieve all snapshots including "from" and "to", but according to the javadoc it doesn't look like.
/**
* @return List of snapshot ids in the range - (fromSnapshotId, toSnapshotId]
* This method assumes that fromSnapshotId is an ancestor of toSnapshotId
*/
For me it's a bit unintuitive but there's representation of the range, so that is OK.
Now the new question arises here - Is something an ancestor of itself? If the answer is "yes", it should return to when from and to are same. If not, throwing exception looks OK, but it should be clearly described in javadoc to avoid such case, and also better to guide that it will throw an exception if from is not an ancestor of to.
| continue; | ||
| } | ||
|
|
||
| int startIndex = lastBatch == null || lastBatch.lastIndexOfSnapshot() ? 0 : lastBatch.endFileIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, this must be always 0 unless there's a bug, as the code will always try to fill the room with the snapshot, and only roll to the next snapshot when the snapshot isn't enough to fill the room. This is also applied to the above in reading from start offset - that said, there might be some possibility to simplify the code.
| return batches; | ||
| } | ||
|
|
||
| private long generateMicroBatch(long snapshotId, int startIndex, boolean isStart, long currentLeftSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented earlier, I'd rather not require passing batches, and just return batch. Adding batch to batches and calculating size of batch can be done in caller.
| @SuppressWarnings("checkstyle:HiddenField") | ||
| private boolean shouldGenerateFromStartOffset(StreamingOffset startOffset) { | ||
| return !startOffset.isSnapshotFullyProcessed() && | ||
| (startOffset.shouldScanAllFiles() || isAppend(table.snapshot(startOffset.snapshotId()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition looks unclear to me (or I may be confused with shouldScanAllFiles).
startOffset.isSnapshotFullyProcessed() == false looks to say Spark should read remaining parts in the snapshot. Do I understand correct?
What remaining parts of condition stand for?
Checking isAppend(table.snapshot(startOffset.snapshotId()) doesn't make sense as the snapshot is immutable and Spark should have checked it to add it to StreamingOffset. If we'd like to double-check to ensure, please add precondition instead.
Given isAppend(table.snapshot(startOffset.snapshotId()) == false doesn't make sense, startOffset.shouldScanAllFiles() doesn't affect the result, though I have no idea what is it and why it's needed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I reminded what is shouldScanAllFiles - in overall logic we only deal with snapshots with append operation so the condition still doesn't make sense, but once we assume we'll allow other kind of snapshots, it might be probably valid then.
| long currentLeftSize = maxSize; | ||
| MicroBatch lastBatch = null; | ||
|
|
||
| checkOverwrite(table.snapshot(startOffset.snapshotId())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we check whether the snapshot in StreamingOffset still exists? How we handle here if the snapshot no longer exist?
Given it's doing incremental read, fail on non-existing snapshot is OK for me as of now, but we should ensure providing clear error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could leave it as for now, and add more checks in another PR.
|
Btw I'm assuming MicroBatch guarantees the orderness of the manifest files & input files in manifest file. Looks like it follows the order of content in the file (manifest list & manifest) so the orderness is guaranteed as of now. If things could be changed, either we need to have a way to provide consistent order across Iceberg versions (e.g. sort by some stable condition), or retrieve the list of files in snapshot and dump along with the offset. |
a29607d to
cd53a0e
Compare
|
Thanks for the recent update, @jerryshao! Now that 0.10.0 is about ready to go, I think I will have more time to help get this in. I've found it difficult to review this one, I think because I've ended up losing context between reviews. Right now, the changes are large enough that it takes me a few days to get time to review. And if it takes you a few days to get the time to update, then we end up with quite a while between reviews and it is harder to pick it back up and continue. I think to avoid that, we may want to look for ways to break this into smaller, easier to review changes. Is there anything that you think we can split out right now? The changes to Also, I opened #1723 to make the offset a bit simpler, hopefully that helps. |
|
Sure, let me think a bit on this to split this PR to small ones. |
|
hi @rdblue @jerryshao Sorry for taking so long to reply. Because this PR is too big to be review and update, I want to split this PR into some small PRs to merge. |
|
Thanks @XuQianJin-Stars for picking up this. Since I don't have pretty much time to continue on this, please go ahead. |
This patch adds support for Spark Structured Streaming Read of Iceberg table [#179]. The key implementation is based on incremental scan, and using number of tasks per batch to control the size of each batch. The basic idea is similar to delta lake, and current implementation only tracks added files for each snapshot based on incremental scan.
Currently this patch can be worked with Spark 2.4 smoothly, and with UT added in key code path. Here I would like to bring this initial version out to see if this is a feasible solution, what else should be changed to improve it? In the meantime I will add more UTs and do the code refactor works.