-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Core: Add SerializableTable #2403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add SerializableTable #2403
Conversation
|
@openinx @pvary @yyanyy @rdblue @RussellSpitzer @jackye1995, could you take a look, please? |
core/src/test/java/org/apache/iceberg/hadoop/TestTableSerialization.java
Outdated
Show resolved
Hide resolved
| TableOperations ops = ((HasTableOperations) table).operations(); | ||
| return ops.current().metadataFileLocation(); | ||
| } else { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tables that don't implement HasTableOperations will not be able to load full metadata but would still be serialized and deserialized correctly.
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
5802094 to
29075c0
Compare
| } | ||
| } | ||
|
|
||
| private FileIO fileIO(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed this place to handle FileIO instead of delegating it to the caller.
We rely on SerializedConfiguration that is introduced in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be made more generic in the future. We can expose ConfigurableFileIO interface or something similar instead of only handling HadoopFileIO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do something similar to dynamic loading, use if (table.io() instanceof Configurable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we would need a way to construct a copy of the class with the serialized conf.
If we knew that all FileIO implementations can be dynamically loaded, we could simply persist the class, props, and optional conf and rebuild it on demand. However, we cannot guarantee all FileIO implementations can be dynamically loaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, the FIleIO has to directly accept the serialized hadoop config supplier. I thought it's enough after #2333, but the setConf() there still needs to accept a Hadoop config and would not work with the supplier.
In that case, I think we can potentially have something like a SerializedConfigurable interface that has method setSerializedConfSupplier(supplier), and check instance of that class. HadoopFileIO can then implement that class instead.
The main point here is that if we do it for HadoopFileIO only, I think we should at least make sure other FileIOs that leverage hadoop configuration can work with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we would definitely need a more generic solution for FileIO implementations that depend on Hadoop conf.
I tried to prototype that but it raised a number of questions. The last idea I had was something like this:
interface ConfigurableFileIO<T extends ConfigurableFileIO> extends FileIO, Configurable {
T toSerializable(SerializedConfiguration serializedConf);
default Object writeReplace() {
// default the implementation to call `toSerializable`
}
}
It was clear this requires a separate PR and more thinking. That's why I propose to address that in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, let's discuss in another PR then. Just to throw another idea I just have:
public interface KryoSerializable<T> extends Serializable {
// return a serialized version of self
default T serialized() {
throw new UnsupportedOperationException("Cannot support kryo serialization");
}
}public interface FileIO extends KryoSerializable<FileIO> {
...
}public class HadoopFileIO ... {
@Override
public FileIO serialized() {
SerializedConfiguration serializedConf = new SerializedConfiguration(getConf());
return new HadoopFileIO(serializedConf::get);
}
}By doing this, we enforce everything dynamically loaded to be kryo serializable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that we should change the contract for dynamically loaded classes. If a user chooses to use Kryo and a dynamically loaded component, it is their responsibility to make sure the two are compatible. We just need to make sure that Iceberg-supplied classes work with Kryo.
| * Hadoop configuration will be propagated to the captured state once an instance | ||
| * of {@link SerializedConfiguration} is constructed. | ||
| */ | ||
| public class SerializedConfiguration implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot just replace the internal implementation of the existing SerializableConfiguration class as Hadoop conf is not immutable and can dynamically change. For example, getting FileSystem would trigger a full reload of configs. That's why I added SerializedConfiguration that is used only in SerializedTable for now.
Later, we can use SerializedConfiguration in writeReplace of SerializableConfiguration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This way reduced the size of the serialized Hadoop conf by 50% as we don't serialize the source of the conf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, this class is Kryo compatible. So we don't have to wrap it into special classes like we do today in Spark.
|
@RussellSpitzer @jackye1995, could you take one more look? I've updated the approach to make |
|
|
||
| public SerializedConfiguration(Configuration conf) { | ||
| this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size()); | ||
| conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should also set conf to the one that is passed in. That way this won't create a new configuration if get is called before the object is serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do this, there will be no guarantee the conf and map are in sync. This class should be used right before the serialization and should not be accessed before it is serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this class to SerializedTable to hide it and make sure nobody uses it.
|
Since I redesigned the original approach, we may consider making serialized table classes non-public and offering a factory or a util class to build them instead. We will need to construct serialized tables manually for Kryo. I don't want to call it a util as it has to be in |
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks for pushing this forward, @aokolnychyi!
92e293a to
4d5e014
Compare
| * @param table the original table to copy the state from | ||
| * @return a read-only serializable table reflecting the current state of the original table | ||
| */ | ||
| public static Table copyOf(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Query engines that need Kryo support will call this method manually.
| /** | ||
| * A factory to create serializable tables. | ||
| */ | ||
| public class SerializableTableFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with a factory instead of a util class so that I can have it in org.apache.iceberg and don't need to open up our base metadata table class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't look like a traditional factory class. We probably don't need this wrapper and directly make the SerializableTable a public class.
Then we can either have a constructor like SerializableTable(Table t) or have a static public staticSerializableTable#copyOf(Table t)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two table implementations: one for base and one for metadata tables. I don't think making those two classes public is a good idea. We need to do a switch somewhere and expose a method for building serializable tables.
While we could name the class like SerializableTable and do a switch there, it may be a bit weird that it would not implement Table. Let me think more about this. Maybe, we can make the metadata serializable table as a nested class.
SerializableTable.copyOf(table)
SerializableTables.copyOf(table)
SerializableTableFactory.copyOf(table)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SerializableTables.copyOf(table) looks more accurate than Factory to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated to SerializableTable.copyOf(table) and made the metadata table a private nested class.
Could you take one more look, @stevenzwu?
|
I was not entirely happy with the implementation. I've decided to introduce a factory that should be used by query engines and hide the table implementations. Will need another review round. |
| import org.apache.spark.SparkConf; | ||
| import org.apache.spark.serializer.KryoSerializer; | ||
|
|
||
| public class KryoHelpers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are this and other test files in spark module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We depend on Kryo from Spark. Iceberg does not bundle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it. But those tests don't seem tied to Spark Engine. Ideally, they probably should live in iceberg-core module. It is probably fine to add Kryo to test compile dep in the iceberg-core module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am afraid each query engine has its own specifics. For example, Spark adds custom serializers for handling unmodifiable collections in Java while Flink does not, which led to exceptions on the Flink side.
There is a number of Kryo related suites in query engine modules. I'd up for refactoring but probably in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good. thx a lot for the context
|
Thanks for reviewing, @stevenzwu @rdblue @RussellSpitzer @jackye1995! |
This PR changes our table serialization to avoid sending extra requests to access frequently needed metadata.