diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java new file mode 100644 index 0000000000000..842f2757af39c --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/CatalogOptions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import java.util.Map; + +/** + * Hoodie catalog options. + */ +public class CatalogOptions { + + public static final ConfigOption CATALOG_PATH = + ConfigOptions.key("catalog.path") + .stringType() + .noDefaultValue() + .withDescription("Catalog base DFS path, used for inferring the sink table path. " + + "The default strategy for a table path is: ${catalog.path}/${db_name}/${table_name}"); + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue("default"); + + /** + * Returns all the common table options that can be shared. + * + * @param catalogOptions The catalog options + */ + public static Map tableCommonOptions(Configuration catalogOptions) { + Configuration copied = new Configuration(catalogOptions); + copied.removeConfig(DEFAULT_DATABASE); + copied.removeConfig(CATALOG_PATH); + return copied.toMap(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java new file mode 100644 index 0000000000000..3ad3deab817ac --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -0,0 +1,508 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.catalog; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.StreamerUtil; + +import org.apache.avro.Schema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.StringUtils; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH; +import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE; + +/** + * Catalog that can set up common options for underneath table. + */ +public class HoodieCatalog extends AbstractCatalog { + private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalog.class); + + private final org.apache.hadoop.conf.Configuration hadoopConf; + private final String catalogPathStr; + private final Map tableCommonOptions; + + private Path catalogPath; + private FileSystem fs; + + public HoodieCatalog(String name, Configuration options) { + super(name, options.get(DEFAULT_DATABASE)); + this.catalogPathStr = options.get(CATALOG_PATH); + this.hadoopConf = StreamerUtil.getHadoopConf(); + this.tableCommonOptions = CatalogOptions.tableCommonOptions(options); + } + + @Override + public void open() throws CatalogException { + fs = FSUtils.getFs(catalogPathStr, hadoopConf); + catalogPath = new Path(catalogPathStr); + try { + if (!fs.exists(catalogPath)) { + throw new CatalogException(String.format("Catalog %s path %s does not exist.", getName(), catalogPathStr)); + } + } catch (IOException e) { + throw new CatalogException(String.format("Checking catalog path %s exists exception.", catalogPathStr), e); + } + } + + @Override + public void close() throws CatalogException { + try { + fs.close(); + } catch (IOException e) { + throw new CatalogException("Closing FileSystem exception.", e); + } + } + + // ------ databases ------ + + @Override + public List listDatabases() throws CatalogException { + try { + FileStatus[] fileStatuses = fs.listStatus(catalogPath); + return Arrays.stream(fileStatuses) + .filter(FileStatus::isDirectory) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new CatalogException("Listing database exception.", e); + } + } + + @Override + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + if (databaseExists(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + + return listDatabases().contains(databaseName); + } + + @Override + public void createDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + if (databaseExists(databaseName)) { + if (ignoreIfExists) { + return; + } else { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + } + + if (!CollectionUtil.isNullOrEmpty(catalogDatabase.getProperties())) { + throw new CatalogException("Hudi catalog doesn't support to create database with options."); + } + + Path dbPath = new Path(catalogPath, databaseName); + try { + fs.mkdirs(dbPath); + } catch (IOException e) { + throw new CatalogException(String.format("Creating database %s exception.", databaseName), e); + } + } + + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (!databaseExists(databaseName)) { + if (ignoreIfNotExists) { + return; + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + List tables = listTables(databaseName); + if (!tables.isEmpty() && !cascade) { + throw new DatabaseNotEmptyException(getName(), databaseName); + } + + if (databaseName.equals(getDefaultDatabase())) { + throw new IllegalArgumentException( + "Hudi catalog doesn't support to drop the default database."); + } + + Path dbPath = new Path(catalogPath, databaseName); + try { + fs.delete(dbPath, true); + } catch (IOException e) { + throw new CatalogException(String.format("Dropping database %s exception.", databaseName), e); + } + } + + @Override + public void alterDatabase(String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("Altering database is not implemented."); + } + + // ------ tables ------ + + @Override + public List listTables(String databaseName) throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + Path dbPath = new Path(catalogPath, databaseName); + try { + return Arrays.stream(fs.listStatus(dbPath)) + .filter(FileStatus::isDirectory) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new CatalogException(String.format("Listing table in database %s exception.", dbPath), e); + } + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + final String path = inferTablePath(catalogPathStr, tablePath); + Map options = TableOptionProperties.loadFromProperties(path, hadoopConf); + final Schema latestSchema = getLatestTableSchema(path); + if (latestSchema != null) { + org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() + .fromRowDataType(AvroSchemaConverter.convertToDataType(latestSchema)); + final String pkConstraintName = TableOptionProperties.getPkConstraintName(options); + if (pkConstraintName != null) { + builder.primaryKeyNamed(pkConstraintName, TableOptionProperties.getPkColumns(options)); + } + final org.apache.flink.table.api.Schema schema = builder.build(); + return CatalogTable.of( + schema, + TableOptionProperties.getComment(options), + TableOptionProperties.getPartitionColumns(options), + TableOptionProperties.getTableOptions(options)); + } else { + throw new TableNotExistException(getName(), tablePath); + } + } + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } else { + throw new TableAlreadyExistException(getName(), tablePath); + } + } + + if (catalogTable instanceof CatalogView) { + throw new UnsupportedOperationException( + "Hudi catalog doesn't support to CREATE VIEW."); + } + + ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) catalogTable; + final String tablePathStr = inferTablePath(catalogPathStr, tablePath); + Map options = applyOptionsHook(tablePathStr, catalogTable.getOptions()); + Configuration conf = Configuration.fromMap(options); + conf.setString(FlinkOptions.PATH, tablePathStr); + ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema(); + if (!resolvedSchema.getPrimaryKey().isPresent()) { + throw new CatalogException("Primary key definition is missing"); + } + final String avroSchema = AvroSchemaConverter.convertToSchema(resolvedSchema.toPhysicalRowDataType().getLogicalType()).toString(); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema); + + // stores two copies of options: + // - partition keys + // - primary keys + // because the HoodieTableMetaClient is a heavy impl, we try to avoid initializing it + // when calling #getTable. + + final String pkColumns = String.join(",", resolvedSchema.getPrimaryKey().get().getColumns()); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns); + options.put(TableOptionProperties.PK_CONSTRAINT_NAME, resolvedSchema.getPrimaryKey().get().getName()); + options.put(TableOptionProperties.PK_COLUMNS, pkColumns); + + if (resolvedTable.isPartitioned()) { + final String partitions = String.join(",", resolvedTable.getPartitionKeys()); + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions); + options.put(TableOptionProperties.PARTITION_COLUMNS, partitions); + } + conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName()); + try { + StreamerUtil.initTableIfNotExists(conf); + // prepare the non-table-options properties + options.put(TableOptionProperties.COMMENT, resolvedTable.getComment()); + TableOptionProperties.createProperties(tablePathStr, hadoopConf, options); + } catch (IOException e) { + throw new CatalogException(String.format("Initialize table path %s exception.", tablePathStr), e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + return StreamerUtil.tableExists(inferTablePath(catalogPathStr, tablePath), hadoopConf); + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } else { + throw new TableNotExistException(getName(), tablePath); + } + } + + Path path = new Path(inferTablePath(catalogPathStr, tablePath)); + try { + this.fs.delete(path, true); + } catch (IOException e) { + throw new CatalogException(String.format("Dropping table %s exception.", tablePath), e); + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException("renameTable is not implemented."); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable catalogBaseTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("alterTable is not implemented."); + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitions(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List listPartitionsByFilter(ObjectPath tablePath, List filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + return null; + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException { + return false; + } + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException { + throw new UnsupportedOperationException("createPartition is not implemented."); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("dropPartition is not implemented."); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterPartition is not implemented."); + } + + @Override + public List listFunctions(String databaseName) throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { + return null; + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction catalogFunction, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("createFunction is not implemented."); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction catalogFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterFunction is not implemented."); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("dropFunction is not implemented."); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics catalogTableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("alterTableStatistics is not implemented."); + } + + @Override + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics catalogColumnStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException("alterTableColumnStatistics is not implemented."); + } + + @Override + public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics catalogTableStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterPartitionStatistics is not implemented."); + } + + @Override + public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics catalogColumnStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterPartitionColumnStatistics is not implemented."); + } + + private @Nullable Schema getLatestTableSchema(String path) { + if (path != null && StreamerUtil.tableExists(path, hadoopConf)) { + try { + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf); + return new TableSchemaResolver(metaClient).getTableAvroSchema(false); // change log mode is not supported now + } catch (Throwable throwable) { + LOG.warn("Error while resolving the latest table schema.", throwable); + // ignored + } + } + return null; + } + + private Map applyOptionsHook(String tablePath, Map options) { + Map newOptions = new HashMap<>(options); + newOptions.put("connector", "hudi"); + newOptions.computeIfAbsent(FlinkOptions.PATH.key(), k -> tablePath); + tableCommonOptions.forEach(newOptions::putIfAbsent); + return newOptions; + } + + private String inferTablePath(String catalogPath, ObjectPath tablePath) { + return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName()); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java new file mode 100644 index 0000000000000..8ab632ba5a5aa --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH; +import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE; + +/** + * A catalog factory impl that creates {@link HoodieCatalog}. + */ +public class HoodieCatalogFactory implements CatalogFactory { + private static final Logger LOG = LoggerFactory.getLogger(HoodieCatalogFactory.class); + + public static final String IDENTIFIER = "hudi"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + + return new HoodieCatalog( + context.getName(), + (Configuration) helper.getOptions()); + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(CATALOG_PATH); + options.add(DEFAULT_DATABASE); + return options; + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java new file mode 100644 index 0000000000000..ba6ca4efd726c --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.catalog; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; + +/** + * Helper class to read/write flink table options as a map. + */ +public class TableOptionProperties { + private static final Logger LOG = LoggerFactory.getLogger(TableOptionProperties.class); + + private static final String FILE_NAME = "table_option.properties"; + + public static final String PK_CONSTRAINT_NAME = "pk.constraint.name"; + public static final String PK_COLUMNS = "pk.columns"; + public static final String COMMENT = "comment"; + public static final String PARTITION_COLUMNS = "partition.columns"; + + public static final List NON_OPTION_KEYS = Arrays.asList(PK_CONSTRAINT_NAME, PK_COLUMNS, COMMENT, PARTITION_COLUMNS); + + /** + * Initialize the {@link #FILE_NAME} meta file. + */ + public static void createProperties(String basePath, + Configuration hadoopConf, + Map options) throws IOException { + Path propertiesFilePath = getPropertiesFilePath(basePath); + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); + try (FSDataOutputStream outputStream = fs.create(propertiesFilePath)) { + Properties properties = new Properties(); + properties.putAll(options); + properties.store(outputStream, + "Table option properties saved on " + new Date(System.currentTimeMillis())); + } + LOG.info(String.format("Create file %s success.", propertiesFilePath)); + } + + /** + * Read table options map from the given table base path. + */ + public static Map loadFromProperties(String basePath, Configuration hadoopConf) { + Path propertiesFilePath = getPropertiesFilePath(basePath); + Map options = new HashMap<>(); + Properties props = new Properties(); + + FileSystem fs = FSUtils.getFs(basePath, hadoopConf); + try (FSDataInputStream inputStream = fs.open(propertiesFilePath)) { + props.load(inputStream); + for (final String name : props.stringPropertyNames()) { + options.put(name, props.getProperty(name)); + } + } catch (IOException e) { + throw new HoodieIOException(String.format("Could not load table option properties from %s", propertiesFilePath), e); + } + LOG.info(String.format("Loading table option properties from %s success.", propertiesFilePath)); + return options; + } + + private static Path getPropertiesFilePath(String basePath) { + String auxPath = basePath + Path.SEPARATOR + AUXILIARYFOLDER_NAME; + return new Path(auxPath, FILE_NAME); + } + + public static String getPkConstraintName(Map options) { + return options.get(PK_CONSTRAINT_NAME); + } + + public static List getPkColumns(Map options) { + if (options.containsKey(PK_COLUMNS)) { + return Arrays.stream(options.get(PK_COLUMNS).split(",")).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + public static List getPartitionColumns(Map options) { + if (options.containsKey(PARTITION_COLUMNS)) { + return Arrays.stream(options.get(PARTITION_COLUMNS).split(",")).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + + public static String getComment(Map options) { + return options.get(COMMENT); + } + + public static Map getTableOptions(Map options) { + Map copied = new HashMap<>(options); + NON_OPTION_KEYS.forEach(copied::remove); + return copied; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 28a70b95f3ef4..5666ef8fa989b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -257,6 +257,7 @@ public static HoodieTableMetaClient initTableIfNotExists(Configuration conf) thr final org.apache.hadoop.conf.Configuration hadoopConf = StreamerUtil.getHadoopConf(); if (!tableExists(basePath, hadoopConf)) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.withPropertyBuilder() + .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) diff --git a/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 1031e4274d54c..47435c745c461 100644 --- a/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -15,3 +15,4 @@ # limitations under the License. org.apache.hudi.table.HoodieTableFactory +org.apache.hudi.table.catalog.HoodieCatalogFactory diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java new file mode 100644 index 0000000000000..18a2369ffd4e5 --- /dev/null +++ b/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.catalog; + +import org.apache.hudi.configuration.FlinkOptions; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH; +import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test cases for {@link HoodieCatalog}. + */ +public class TestHoodieCatalog { + + private static final String TEST_DEFAULT_DATABASE = "test_db"; + private static final String NONE_EXIST_DATABASE = "none_exist_database"; + private static final List CREATE_COLUMNS = Arrays.asList( + Column.physical("uuid", DataTypes.VARCHAR(20)), + Column.physical("name", DataTypes.VARCHAR(20)), + Column.physical("age", DataTypes.INT()), + Column.physical("tss", DataTypes.TIMESTAMP(3)), + Column.physical("partition", DataTypes.VARCHAR(10)) + ); + private static final UniqueConstraint CONSTRAINTS = UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid")); + private static final ResolvedSchema CREATE_TABLE_SCHEMA = + new ResolvedSchema( + CREATE_COLUMNS, + Collections.emptyList(), + CONSTRAINTS); + + private static final List EXPECTED_TABLE_COLUMNS = + CREATE_COLUMNS.stream() + .map( + col -> { + // Flink char/varchar is transform to string in avro. + if (col.getDataType() + .getLogicalType() + .getTypeRoot() + .equals(LogicalTypeRoot.VARCHAR)) { + return Column.physical(col.getName(), DataTypes.STRING()); + } else { + return col; + } + }) + .collect(Collectors.toList()); + private static final ResolvedSchema EXPECTED_TABLE_SCHEMA = + new ResolvedSchema(EXPECTED_TABLE_COLUMNS, Collections.emptyList(), CONSTRAINTS); + + private static final Map EXPECTED_OPTIONS = new HashMap<>(); + static { + EXPECTED_OPTIONS.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + EXPECTED_OPTIONS.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false"); + EXPECTED_OPTIONS.put(FlinkOptions.PRE_COMBINE.key(), "true"); + } + + private static final ResolvedCatalogTable EXPECTED_CATALOG_TABLE = new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(), + "test", + Arrays.asList("partition"), + EXPECTED_OPTIONS), + CREATE_TABLE_SCHEMA + ); + + private TableEnvironment streamTableEnv; + private HoodieCatalog catalog; + + @TempDir + File tempFile; + + @BeforeEach + void beforeEach() { + EnvironmentSettings settings = EnvironmentSettings.newInstance().build(); + streamTableEnv = TableEnvironmentImpl.create(settings); + streamTableEnv.getConfig().getConfiguration() + .setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2); + File testDb = new File(tempFile, TEST_DEFAULT_DATABASE); + testDb.mkdir(); + Map catalogOptions = new HashMap<>(); + catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath()); + catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE); + catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions)); + catalog.open(); + } + + @Test + public void testListDatabases() { + List actual = catalog.listDatabases(); + assertTrue(actual.contains(TEST_DEFAULT_DATABASE)); + assertFalse(actual.contains(NONE_EXIST_DATABASE)); + } + + @Test + public void testDatabaseExists() { + assertTrue(catalog.databaseExists(TEST_DEFAULT_DATABASE)); + assertFalse(catalog.databaseExists(NONE_EXIST_DATABASE)); + } + + @Test + public void testCreateAndDropDatabase() throws Exception { + CatalogDatabase expected = new CatalogDatabaseImpl(Collections.emptyMap(), null); + catalog.createDatabase("db1", expected, true); + + CatalogDatabase actual = catalog.getDatabase("db1"); + assertTrue(catalog.listDatabases().contains("db1")); + assertEquals(expected.getProperties(), actual.getProperties()); + + // drop exist database + catalog.dropDatabase("db1", true); + assertFalse(catalog.listDatabases().contains("db1")); + + // drop non-exist database + assertThrows(DatabaseNotExistException.class, + () -> catalog.dropDatabase(NONE_EXIST_DATABASE, false)); + } + + @Test + public void testCreateDatabaseWithOptions() { + Map options = new HashMap<>(); + options.put("k1", "v1"); + options.put("k2", "v2"); + + assertThrows( + CatalogException.class, + () -> catalog.createDatabase("db1", new CatalogDatabaseImpl(options, null), true)); + } + + @Test + public void testCreateTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // test create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test table exist + assertTrue(catalog.tableExists(tablePath)); + + // test create exist table + assertThrows(TableAlreadyExistException.class, + () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false)); + } + + @Test + public void testListTable() throws Exception { + ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2"); + + // create table + catalog.createTable(tablePath1, EXPECTED_CATALOG_TABLE, true); + catalog.createTable(tablePath2, EXPECTED_CATALOG_TABLE, true); + + // test list table + List tables = catalog.listTables(TEST_DEFAULT_DATABASE); + assertTrue(tables.contains(tablePath1.getObjectName())); + assertTrue(tables.contains(tablePath2.getObjectName())); + + // test list non-exist database table + assertThrows(DatabaseNotExistException.class, + () -> catalog.listTables(NONE_EXIST_DATABASE)); + } + + @Test + public void testGetTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + Map expectedOptions = new HashMap<>(EXPECTED_OPTIONS); + expectedOptions.put(FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_MERGE_ON_READ); + expectedOptions.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "false"); + expectedOptions.put(FlinkOptions.PRE_COMBINE.key(), "true"); + expectedOptions.put("connector", "hudi"); + expectedOptions.put( + FlinkOptions.PATH.key(), + String.format("%s/%s/%s", tempFile.getAbsolutePath(), tablePath.getDatabaseName(), tablePath.getObjectName())); + + // test get table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate schema + Schema actualSchema = actualTable.getUnresolvedSchema(); + Schema expectedSchema = Schema.newBuilder().fromResolvedSchema(EXPECTED_TABLE_SCHEMA).build(); + assertEquals(expectedSchema, actualSchema); + // validate options + Map actualOptions = actualTable.getOptions(); + assertEquals(expectedOptions, actualOptions); + // validate comment + assertEquals(EXPECTED_CATALOG_TABLE.getComment(), actualTable.getComment()); + // validate partition key + assertEquals(EXPECTED_CATALOG_TABLE.getPartitionKeys(),((CatalogTable) actualTable).getPartitionKeys()); + } + + @Test + public void dropTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test drop table + catalog.dropTable(tablePath, true); + assertFalse(catalog.tableExists(tablePath)); + + // drop non-exist table + assertThrows(TableNotExistException.class, + () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, "non_exist"), false)); + } +}