Skip to content

Conversation

@jackye1995
Copy link
Contributor

@jackye1995 jackye1995 commented Oct 15, 2020

  • refactor the class loader logic in LocationProviders to a util class
  • add functionality to dynamically load FileIO in Hadoop and Hive table operations
  • introduce a catalog property io-impl

HadoopFileIO.class.getName(),
HadoopFileIO.class,
new Class<?>[] { Configuration.class },
new Object[]{ conf });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is actually cleaner than just using the reflection utilities here. I understand wanting to reuse code, but I don't think that this operation is a good candidate for it.

This mixes property checking with the reflect operation, and as a result the default implementation is now loaded using reflection as well. That means that we lose the compile time check that we're instantiating HadoopFileIO correctly and lose the ability to easily refactor.

This also doesn't support more than one constructor. For LocationProvider, the constructor could accept either location and properties or no args, but this only checks for a single constructor and is harder to read as well.

The first thing I would change is to move the property check out of the utility method and directly instantiate HadoopFileIO in an else block. After making that change, there's little value in wrapping the reflection helpers. The main complexity is catching NoSuchMethodException to return a better error message (which is optional), but making that generic ends up being quite complex:

      List<List<String>> allowedArgTypesString = allowedArgTypes.stream()
          .map(argTypes -> Arrays.stream(argTypes).map(Class::getName).collect(Collectors.toList()))
          .collect(Collectors.toList());
      throw new IllegalArgumentException(String.format(
          "Unable to find a constructor for implementation %s of %s. " +
              "Make sure the implementation is in classpath, and that it either " +
              "has a public no-arg constructor or one of the following constructors: %s ",
          impl, classInterface, allowedArgTypesString), e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method supports no arg constructor, and also allows using multiple constructor options. The main reason for refactoring is because it has 2 lengthy try catch blocks for NoSuchMethodException when looking for a constructor, and also a ClassCastException when trying to cast the class.

But I agree that it loses the ability to check default implementation at compile time. What if I leave the default class construction in the else blocks, and keep the util to only initialize the dynamic class?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you left the property handling and default classes to else blocks, then the only value that is left is having one method with blocks to throw a little more friendly exception messages. But the cost of that is that both the call and the implementation are quite a bit more complicated and harder to read. I don't really think it is worth the refactor.

current() == null ? new HashMap<>() : current().properties(),
TableProperties.WRITE_FILE_IO_IMPL,
HadoopFileIO.class.getName(),
HadoopFileIO.class,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface should be FileIO instead of HadoopFileIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the same concrete class was always instantiated, I think we used the class in case we needed anything specific to HadoopFileIO. But it can be FileIO since nothing specific is used.

if (defaultFileIo == null) {
defaultFileIo = new HadoopFileIO(conf);
defaultFileIo = ClassLoaderUtil.fromProperty(
current() == null ? new HashMap<>() : current().properties(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move instantiation to a separate method that accepts TableMetadata so that we can use the correct metadata in transactions.

}

static class DefaultLocationProvider implements LocationProvider {
public static class DefaultLocationProvider implements LocationProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to make this public, and I'd like to avoid exposing new classes through the public API. If we need to use reflection, we can always use hiddenImpl to set the accessible flag. But I think it would be better to instantiate these classes directly like before so that we have compile checks.

@jackye1995 jackye1995 requested a review from rdblue October 20, 2020 21:08
*/
public static FileIO loadFileIO(TableMetadata tableMetadata, Configuration conf) {
if (tableMetadata != null) {
Map<String, String> properties = tableMetadata.properties();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we only need properties here. I think that we should probably pass a map of properties to this method instead. That way changes to table metadata won't need to affect this class.

DynConstructors.Ctor<FileIO> ctor;
try {
ctor = DynConstructors.builder(FileIO.class)
.impl(impl, Configuration.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to add an alternative constructor that accepts Map to pass the table properties. We can do this in a follow-up when it's needed, or here if you agree.

} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize FileIO, fail to cast %s to class %s.",
impl, FileIO.class), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: does this need to be on a separate line?


public static final String WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl";

public static final String WRITE_FILE_IO_IMPL = "write.file-io.impl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't just a write option. It controls how files are read as well.

How about using an io namespace instead? Maybe io.file-io.impl or io.impl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I used write because there is no namespace for both read and write. I can change to io.file-io.impl, io.impl seems too generic in case we want to have something else in the same namespace in the future.

if (fileIO == null) {
fileIO = new HadoopFileIO(conf);
// avoid refresh metadata because refresh() calls io() again
fileIO = CatalogUtil.loadFileIO(getCurrentMetadataNoRefresh(), conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about moving this to the base class? Then we wouldn't need to add the "no refresh" method to get metadata and could access current directly. This PR also makes it far less likely that implementations will override io, since a new one can be plugged in easily through table properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing that, but (1) if it is put to base class, the logic still needs to be duplicated, because HadoopTableOperations does not extend that base class; (2) the method needs to take the Hadoop configuration object, and it seems like the base class tries to not assume an implementation must use Hadoop configuration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about Configuration. We don't want to rely on it in the base class. Let me take a closer look at this problem, it's concerning that there's a loop.

Copy link
Contributor Author

@jackye1995 jackye1995 Oct 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am also just thinking about the loop issue. If someone tries to create a new table, at that point of time currentMetadata is null, so it will choose use the default FileIO to write the initial metadata, and the new FileIO might not be able to read it, which would be a problem.

That means when creating the table it needs to check if the io.file-io.impl is set in the properties and update the FileIO implementation accordingly in the doCommit method. But it feels like an ugly hack...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and we have to instantiate FileIO before we load the metadata for a table, so we don't really know if we're going to get it right. Seems like we are trying to configure this in the wrong place. Let's talk about where this should go in the sync today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because Iceberg catalog does not enforce the implementation detail of how table properties are store, an implementation can potentially write the table properties using the io(). Therefore if we try to determine the io() to use based on table property, there is always a chance for cyclic dependency in logic. HadoopCatalog has exactly this problem.

Here are 3 potential ways to solve this problem:

  1. Create a new constructor for each TableOperation that accepts FileIO as an argument. If the constructor is used, then FileIO is set at construction time instead of the first time io() is called. A default implementation can be loaded based on namespace properties. Engines like Spark and Flink can load a configuration key from Hadoop config to load the FileIO outside the core logic. This requires code change at multiple places, including (1) add new constructors in existing table operations, and (2) add support in each engine separately.

  2. Determine FileIO based on the warehouse path scheme. For example, s3:// always use S3FileIO, hdfs:// always use HadoopFileIO. However, it is easy to create a cyclic dependency issue, for example: iceberg-aws module depends on iceberg-core, so HadoopCatalog in iceberg-core cannot import S3FileIO in iceberg-aws. This means we need a registry that allows people to register custom IO mapping at runtime. This approach has a similar amount of work as approach 1, because we need code change in each existing catalog, and each engine needs to find a way to register FIleIO implementation to scheme mapping at JVM start time.

  3. Load FileIO using Hadoop configuration. Because HadoopFileIO is always the default implementation, Hadoop configuration object is always passed in. So user can always just define custom implementation at core-site.xml. This creates a simple solution with no dependency concern. However, this is not an elegant option because ideally we would like to load all Iceberg classes using Iceberg-based configs such as table properties.

Copy link
Contributor

@rdblue rdblue Oct 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One quick thing to note: we want to avoid increasing dependence on a Hadoop configuration. It's fine to pass one where it is required, but we should always make sure there is an alternative and should generally avoid using config from it.

Thanks for writing up the options. Sounds like we have options for configuring a FileIO:

  1. Pass FileIO in from the catalog and use catalog config to initialize it
  2. Instantiate FileIO based on table location or metadata path just before using it
  3. Use config from the environment, like a Hadoop FileSystem

I think the best choice is #1, catalog-level configuration.

We can't use table-level config because that creates a situation where it isn't available to load a table. Working around this requires deciding which FileIO to load based on different information at different times (location for create, metadata location for load), and would also make supplying a custom FileIO implementation in configuration difficult.

Environment-level config doesn't fit with the current model for customizing behavior, where everything is injected through Catalog and Table. Plus, Iceberg has no environment config like Hadoop Configuration and I don't think it is a good idea to add it. I think the most sensible thing is to maintain the chain of configuration: application passes config to catalogs, catalogs pass config to tables.

I don't think it would be too difficult to change over to FileIO passed in from the catalog, but this would mean not basing the implementation on table path. We wouldn't know to choose between S3FileIO or HadoopFileIO for a table and would have to use one for all tables from a catalog, or build a FileIO that knows when to choose between the two. I was already thinking that we would need a delegating implementation, since S3FileIO can't handle non-S3 paths. That should be okay.

@jackye1995
Copy link
Contributor Author

@rdblue So in the latest version, I have added nullable field fileIOImpl as a part of constructor for HadoopTableOperations and HiveTableOperations. The HadoopCatalog, HadoopTables and HiveCatalog uses Hadoop configuration to decide the value, which should be fine because they are already using the configuration to determine some other important configs. Other catalog implementations can use any way to pass in the value.

@jackye1995 jackye1995 changed the title refactor util to dynamically load location provider and file io dynamically load custom FileIO implementation Oct 30, 2020
@jackye1995 jackye1995 requested a review from rdblue October 30, 2020 17:16

public static final String ENGINE_HIVE_ENABLED = "iceberg.engine.hive.enabled";

public static final String CUSTOM_FILE_IO_IMPL = "iceberg.fileio.impl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a Hadoop config property?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is used to get the implementation class. Iceberg should never use a Hadoop Configuration for config, except when integrating with an engine that uses it for config. It's okay to store configuration for Hive, but not for Iceberg core.

For FileIO implementation, I think the config should come from the catalog property map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comments! I will fix the other parts accordingly. I think this is the place I do not fully understand. Is there a catalog property map? I don't see such a thing in the catalog interface, and that is why I have to use the Hadoop config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are config properties from both Flink and Spark that we use when constructing the Hive and Hadoop catalogs. And for #1640, we are talking about using an initialization method to set the config using a string map (and a catalog name). While these classes don't pull their own config out of a map, I think of the config as coming from a map of config properties.

Sorry for the confusion. I think we should configure this however the catalog is configured, which will very likely be a string map passed to an init method in the future.

this.conf = conf;
this.createStack = Thread.currentThread().getStackTrace();
this.closed = false;
this.defaultFileIO = new HadoopFileIO(conf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instantiating HadoopFileIO before initialize is called is because it isn't always called right now?

Configuration hadoopConf,
String warehouseLocation,
Map<String, String> properties) {
return new HadoopCatalogLoader(name, hadoopConf, warehouseLocation, properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Flink, I think we should pass the warehouse location and other config through properties. Right now, we pull it out in FlinkCatalogFactory, but it doesn't make much sense to pull out only some properties.

How about leaving the current CatalogLoader.hadoop and CatalogLoader.hive as they are and adding variants like CatalogLoader.hadoop(String name, Map<String, String> properties, Configuration conf)? Then we can pull the properties out using standard config properties that we put in CatalogProperties in the loader classes.

FYI @JingsongLi and @openinx: we're improving how we configure catalogs and allowing some parts to be loaded dynamically. The main change here is passing properties to the catalog as a map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only see the CatalogLoader.hadoop and CatalogLoader.hive used in a single place FlinkCatalogFactory.createCatalogLoader, that is why I directly changed the class signature. I don't know if there is any benefit in keeping the old ones.

Changing the signature to CatalogLoader.hadoop(String name, Map<String, String> properties, Configuration conf) looks like a good idea that simplifies the code, let me do that first.

(speaking of this, I should also add null check for the properties map and also give a fixed serialization id for those classes because Flink serializes the catalog loader)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Since these are package-private, I don't think we need to maintain them. I was thinking that people would call these from the public API to configure the source and sink builders, but these aren't public.

We may still want to keep them to avoid changing lots of files in this PR, and I still think it is a good idea to pull out the config here, rather than in FlinkCatalogFactory because we want to move toward catalogs interpreting their own property maps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also give a fixed serialization id for those classes because Flink serializes the catalog loader

We know that serialization across Iceberg versions may be a problem, but I'm not sure that we want to introduce a serialization id. In general, we avoid this because we want serialization to fail if there are multiple library versions. Java serialization isn't something that we want to use for compatibility across versions. The cases where we have multiple library versions are rare, and we want to design something for a rolling update.

String warehouse,
int clientPoolSize,
Configuration conf,
Map<String, String> properties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we prefer args on as few lines as possible, rather than this style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sorry I did not see the comments and get a chance to change it before the merge. I guess this kind of issue will be less frequent as I get more familiar with the code style here, but feel free to ping me at any time and I can fix all the style issues before merge 😃

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! I made this comment just before I merged it. Commits don't need to be perfect. I just wanted to let you know for next time.

public static final String HIVE_URI = "uri";
public static final String HIVE_CLIENT_POOL_SIZE = "clients";
public static final String HIVE_CONF_DIR = "hive-conf-dir";
public static final String WAREHOUSE_LOCATION = "warehouse";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: leaving these in place would have reduce the number of files that this needed to touch, and avoided a possible problem removing public fields. I don't think it's worth blocking for this change, but we like to keep patches as small as possible by not breaking references like these.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants