-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: Add RocksDBStructLikeMap #2680
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| compile "com.fasterxml.jackson.core:jackson-databind" | ||
| compile "com.fasterxml.jackson.core:jackson-core" | ||
| compile "com.github.ben-manes.caffeine:caffeine" | ||
| compile "org.rocksdb:rocksdbjni" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is a WIP, but is it possible to avoid bringing this into the class path once this is done?
I believe Flink brings in their own RocksDB fork (frocksdb maybe?) and I imagine others might too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the StructLikeMap is in the core code path, which don't depend on any specific compute engines(spark/flink/hive/presto etc), so in theory we could not assume that the engine runtime will include this dependency jar. It's better to include it in iceberg's jar if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense.
I'd advocate for possibly sharing it though, so that multiple versions can exist together.
With Spark 3.2 bringing support for RocksDB statestore as well, it might be wise to shade it in the finl outcome. But that is admittedly a long ways off.
| RocksDB.loadLibrary(); | ||
| } | ||
|
|
||
| public static RocksDBStructLikeMap create(String path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, if multiple tasks are located in the same node, then they will use the same rocksdb location. That will mess up the spilled data. We will need to create separate rocksdb dir for different tasks even if they are located in the same node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to create separate rocksdb dir for different tasks even if they are located in the same node.
I think this comment is out of date now because we've already generated an unique directory name for each BaskTaskWriter now , please see here
https://github.com/apache/iceberg/pull/2680/files#diff-e550ee80e8343a3396e67ab7fec4fe50a2ac633b02682cdc330d9a2b719e1a5eR61-R68 .
Yes, I think we could add more rocksdb options (such as rocksdb writer buffer size etc) in here so that we could control what's the exactly behavior in rocksdb. https://github.com/apache/iceberg/pull/2680/files#diff-125c2d685d98fcab9fcf124dd51f55dafe83462f6aa0462eeb1fe034e136f8afR86-R91
Thanks for the great feedback @zhougit86 !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think allow changing RocksDB DBOptions and ColumnFamilyOptions is quite important for debugging / tuning OOM issue with RocksDB. Maybe we should allow user to supply a config factory, similar to how Flink does with its RocksDB state backend.
|
I've been looking at our task writers to implement merge-on-read in Spark so I should be able to help reviewing this. |
|
Thanks for your review bandwidth, @aokolnychyi ! |
|
@openinx Has this patch been verified? Can it be used in the production environment? |
|
@coolderli , I think this PR is ready for reviewing and provided fully covered unit tests, but we still don't get this merged because I don't know whether reviewers still has concerns for it. I think you need to have a basic test in staging env before publish it in prod env. |
|
I think the |
|
@stevenzwu, could you help review this? |
| record.set(i, null); | ||
| } else { | ||
| byte[] fieldData = new byte[length]; | ||
| int fieldDataSize = dis.read(fieldData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be int fieldDataSize = length > 0 ? dis.read(fieldData) : 0;.
Because dis.read() will return -1 when fieldData lenght is 0(e.g. empty string will cause this) and make Preconditions.checkState fail.
| } else if (s instanceof String) { | ||
| encoder.writeString(new Utf8((String) s)); | ||
| } else if (s instanceof CharSequence) { | ||
| encoder.writeString((CharSequence) s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? We generally want to avoid writing CharSequence directly because it will require conversion.
|
|
||
| public static Map<StructLike, StructLike> load(Types.StructType keyType, | ||
| Types.StructType valType, | ||
| Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this is a good option for controlling whether this feature is turned on. Passing generic properties all the way down to a factory method for a specific class isn't a good option. Instead, the calling code should decide which map to use. We want to limit where we pass property maps.
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.util.ByteBuffers; | ||
|
|
||
| public class Serializers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the scope of this class? What will use it and how do we avoid exposing it in a confusing way in the API?
Don't we have other methods of serializing data that should be sufficient?
| package org.apache.iceberg.types; | ||
|
|
||
| public interface Serializer<T> { | ||
| byte[] serialize(T object); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to use byte[]? Normally we use ByteBuffer to avoid the copies necessarily introduced by byte[].
|
|
||
| @Override | ||
| public <T> void set(int pos, T value) { | ||
| put(pos, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a private put method instead of moving the switch here in set? Won't this need a SuppressWarnings either way?
|
@openinx, thanks for working on this. The approach of adding a RocksDBStructLikeMap seems good to me. There are a couple of issues that I think we need to work out before getting into details though:
I think the first thing to do is to get a PR with just the StructLikeMap implementation in and then we can work on integrating this with the rest of the code. |
|
Thanks for the feedback, @rdblue . I will answer your questions sometimes later, the next PR will also reflect the changes. |
|
@openinx Using RocksDB can really solve the oom problem. But will it make things more complicated?Do users need to participate in the tuning of RocksDB? |
|
We have used RocksDB in the past with Flink, and it does require some tuning to avoid OOM. For example, we have to change the memory allocator to |
It's true that RocksDB introduces more complexity. But it's really a trade off. User's don't necessarily have to opt-in either.
Additionally, RocksDB is arguably the most common state backend for Flink. Now, as of Spark 3.2, there's a RocksDB state store for structured streaming contributed by databricks. Given these two things, I don't think that the additional settings available is a huge issue. I think the Flink project has done a lot of work in minimizing the necessary tuning for RocksDB. With time, similar things could be introduced here. But even making this a possibility will be a really large win for very large jobs which might have a large amount of state or need to otherwise spill to disk. |
|
I agree with you that having RocksDB is a win, our stateful Flink jobs all use Rocksdb backend :) But I think we need to more clearly communicated the trade-off to the user. For example, RocksDB can be a magnitude slower than in-memory hash map even with very fast SSD, depending on the read/write pattern. |
For sure. No disagreement. But we're just maybe not their yet is all I mean. Having the ability to spill to disk is a lot of work - which @openinx and others have been doing a great job with. But even look at the age of this PR - it's moving along, but it's definitely a process. It's good to be aware of how the need to pass configurations will affect the rest of the codebase, as Ryan had mentioned was one of the bigger areas for concern. Ideally we can pass configs to rocksdb without too much disruption to the rest of the codebase. Once that has crossed it's goal, we can worry more about user experience when using RocksDB. And I'll happily drop most anything I'm doing to review most documentation PRs! And when the time comes, if you want to write a blogpost about using RocksDB, I'd happily review any drafts or be sure it's prominently displayed on the Apache Flink website 🙂 For now, let's focus on getting RocksDB or similar available to developers within Iceberg and then we can definitely focus on the user experience. It is definitely true that there are many situations where it makes less sense than remaining in-memory, but having the option sure is nice. But it's good to be thinking about end user experience always. It's definitely one of my biggest concerns with all things too. So many thanks for that. 😀 |
(Shameless plug while I have your attention) Also, if you're looking to help out or get more involved, we can definitely use more reviewers on the Flink side of the codebase that have the necessary context with Flink to help properly review code! I definitely appreciate your concern for end-users as well. I don't consider myself "well-informed" about much of anything, least of all Flink. But I have experience with it, and I'll check out PRs from Github locally (I use the Github CLI to clone by PR number) and poke around the Iceberg and Flink source code in my IDE to be able to try to contribute to those conversations. And if you can download it and run some sample jobs locally, even better! That's also always very appreciatied. Hope to hear more from you. 😀 |
|
Are we still considering this feature? I wonder what would be the performance of this versus in memory vectorized read + compaction. If the later is good enough, we can probably avoid introducing this complication. |
Discussion came back up, but I'm not sure. Besides other perceived possible benefits from RocksDB (which admittedly might not be reason enough to introduce the added complexity for some use cases), the motivating use case is really the potentially much larger than typical memory usage required to process the initial table snapshot for CDC, which is a common use case. To take a snapshot or begin a CDC stream from MySQL for example, a full table copy is required before the initial checkpoint can be taken iiuc. So this historically has lead users to have to majorly increase their initial memory / resource allocation and then tune down after the job had successfully checkpointed. I'll follow up with openinx and steven about the status of this feature and state of the problem in general. Without that issue, I'd lean towards keeping as few elements in state as possible to avoid having to worry about further serrialized forms. But I think the issue is still outstanding, as a full snapshot is needed before any deltas can be written iiuc. |
|
@kbendick @openinx If we are not inclining toward using rocksdb, can we trigger checkpoints on the basis of number of events as we have in the s3 kafka sink connector flush.size I am not so sure if something like this can be done in flink |
|
@rdblue @stevenzwu Hi, what do you think of this PR? In my company, there are some big tables such as TiDB or MySQL binlog that will use flink to load data to the iceberg. For example, we have a TiDB table that has six hundred million records. If we use flink streaming mode, it will cost too much time. If we use batch mode, the executor needs large heap memory to avoid OOM. Any suggestions about this? I think there are two different problems.
|
@coolderli Are you using the latest flink cdc connector and iceberg to export the stream ? I remember the latest flink cdc connector are refactored to use the netflix DBLog algorithm in parallel to export those existing RDBMS records in parallel. So in theory, if we don't have any performance blocker in the flink->iceberg path. There should not be any blocker that cost too much time. What's your bottleneck in your CDC exporting path ? |
@openinx Hello, Will not OOM be triggered if we use mysql-cdc2.0 to sync data to iceberg? Because mysql-cdc2.0 checkpoint in chunk level ? |
Currently, the insertedRowMap in BaseEqualityDeltaWriter is a in-memory hash map, which means it will be easily OOM if the data set is slightly larger than the given memory from task manager. For example, if we are migrating the full snapshot from mysql table to apache iceberg table, the existing data set from the mysql table will be quite large, but all those rows will be exported in the same flink checkpoint, OOM will be easily happened.
In this patch, we are trying to provide a map that was backend with an embedded rocksdb, which means we could spill the rows into disk when exceeding to the given threshold. The patch is still working in progess, will still need more test cases to make it available for reviewing.