diff --git a/hudi-aws/pom.xml b/hudi-aws/pom.xml index d44a389a61f66..9b279489d60b5 100644 --- a/hudi-aws/pom.xml +++ b/hudi-aws/pom.xml @@ -40,6 +40,11 @@ hudi-common ${project.version} + + org.apache.hudi + hudi-hive-sync + ${project.version} + @@ -75,6 +80,28 @@ ${dynamodb.lockclient.version} + + + ${hive.groupid} + hive-service + ${hive.version} + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.parquet + parquet-avro + + com.amazonaws @@ -103,6 +130,12 @@ io.dropwizard.metrics metrics-core + + + com.amazonaws + aws-java-sdk-glue + ${aws.sdk.version} + diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java new file mode 100644 index 0000000000000..105107bce0b1e --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hive.AbstractHiveSyncHoodieClient; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.sync.common.model.Partition; + +import com.amazonaws.services.glue.AWSGlue; +import com.amazonaws.services.glue.AWSGlueClientBuilder; +import com.amazonaws.services.glue.model.AlreadyExistsException; +import com.amazonaws.services.glue.model.BatchCreatePartitionRequest; +import com.amazonaws.services.glue.model.BatchCreatePartitionResult; +import com.amazonaws.services.glue.model.BatchUpdatePartitionRequest; +import com.amazonaws.services.glue.model.BatchUpdatePartitionRequestEntry; +import com.amazonaws.services.glue.model.BatchUpdatePartitionResult; +import com.amazonaws.services.glue.model.Column; +import com.amazonaws.services.glue.model.CreateDatabaseRequest; +import com.amazonaws.services.glue.model.CreateDatabaseResult; +import com.amazonaws.services.glue.model.CreateTableRequest; +import com.amazonaws.services.glue.model.CreateTableResult; +import com.amazonaws.services.glue.model.DatabaseInput; +import com.amazonaws.services.glue.model.EntityNotFoundException; +import com.amazonaws.services.glue.model.GetDatabaseRequest; +import com.amazonaws.services.glue.model.GetPartitionsRequest; +import com.amazonaws.services.glue.model.GetPartitionsResult; +import com.amazonaws.services.glue.model.GetTableRequest; +import com.amazonaws.services.glue.model.PartitionInput; +import com.amazonaws.services.glue.model.SerDeInfo; +import com.amazonaws.services.glue.model.StorageDescriptor; +import com.amazonaws.services.glue.model.Table; +import com.amazonaws.services.glue.model.TableInput; +import com.amazonaws.services.glue.model.UpdateTableRequest; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.schema.MessageType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.hudi.aws.utils.S3Utils.s3aToS3; +import static org.apache.hudi.common.util.MapUtils.nonEmpty; +import static org.apache.hudi.hive.util.HiveSchemaUtil.getPartitionKeyType; +import static org.apache.hudi.hive.util.HiveSchemaUtil.parquetSchemaToMapSchema; +import static org.apache.hudi.sync.common.util.TableUtils.tableId; + +/** + * This class implements all the AWS APIs to enable syncing of a Hudi Table with the + * AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html). + */ +public class AWSGlueCatalogSyncClient extends AbstractHiveSyncHoodieClient { + + private static final Logger LOG = LogManager.getLogger(AWSGlueCatalogSyncClient.class); + private static final int MAX_PARTITIONS_PER_REQUEST = 100; + private static final long BATCH_REQUEST_SLEEP_MILLIS = 1000L; + private final AWSGlue awsGlue; + private final String databaseName; + + public AWSGlueCatalogSyncClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) { + super(syncConfig, hadoopConf, fs); + this.awsGlue = AWSGlueClientBuilder.standard().build(); + this.databaseName = syncConfig.databaseName; + } + + @Override + public List getAllPartitions(String tableName) { + try { + GetPartitionsRequest request = new GetPartitionsRequest(); + request.withDatabaseName(databaseName).withTableName(tableName); + GetPartitionsResult result = awsGlue.getPartitions(request); + return result.getPartitions() + .stream() + .map(p -> new Partition(p.getValues(), p.getStorageDescriptor().getLocation())) + .collect(Collectors.toList()); + } catch (Exception e) { + throw new HoodieGlueSyncException("Failed to get all partitions for table " + tableId(databaseName, tableName), e); + } + } + + @Override + public void addPartitionsToTable(String tableName, List partitionsToAdd) { + if (partitionsToAdd.isEmpty()) { + LOG.info("No partitions to add for " + tableId(databaseName, tableName)); + return; + } + LOG.info("Adding " + partitionsToAdd.size() + " partition(s) in table " + tableId(databaseName, tableName)); + try { + Table table = getTable(awsGlue, databaseName, tableName); + StorageDescriptor sd = table.getStorageDescriptor(); + List partitionInputs = partitionsToAdd.stream().map(partition -> { + StorageDescriptor partitionSd = sd.clone(); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + partitionSd.setLocation(fullPartitionPath); + return new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd); + }).collect(Collectors.toList()); + + for (List batch : CollectionUtils.batches(partitionInputs, MAX_PARTITIONS_PER_REQUEST)) { + BatchCreatePartitionRequest request = new BatchCreatePartitionRequest(); + request.withDatabaseName(databaseName).withTableName(tableName).withPartitionInputList(batch); + + BatchCreatePartitionResult result = awsGlue.batchCreatePartition(request); + if (CollectionUtils.nonEmpty(result.getErrors())) { + throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName) + + " with error(s): " + result.getErrors()); + } + Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); + } + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to add partitions to " + tableId(databaseName, tableName), e); + } + } + + @Override + public void updatePartitionsToTable(String tableName, List changedPartitions) { + if (changedPartitions.isEmpty()) { + LOG.info("No partitions to change for " + tableName); + return; + } + LOG.info("Updating " + changedPartitions.size() + "partition(s) in table " + tableId(databaseName, tableName)); + try { + Table table = getTable(awsGlue, databaseName, tableName); + StorageDescriptor sd = table.getStorageDescriptor(); + List updatePartitionEntries = changedPartitions.stream().map(partition -> { + StorageDescriptor partitionSd = sd.clone(); + String fullPartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition).toString(); + List partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); + sd.setLocation(fullPartitionPath); + PartitionInput partitionInput = new PartitionInput().withValues(partitionValues).withStorageDescriptor(partitionSd); + return new BatchUpdatePartitionRequestEntry().withPartitionInput(partitionInput).withPartitionValueList(partitionValues); + }).collect(Collectors.toList()); + + for (List batch : CollectionUtils.batches(updatePartitionEntries, MAX_PARTITIONS_PER_REQUEST)) { + BatchUpdatePartitionRequest request = new BatchUpdatePartitionRequest(); + request.withDatabaseName(databaseName).withTableName(tableName).withEntries(batch); + + BatchUpdatePartitionResult result = awsGlue.batchUpdatePartition(request); + if (CollectionUtils.nonEmpty(result.getErrors())) { + throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName) + + " with error(s): " + result.getErrors()); + } + Thread.sleep(BATCH_REQUEST_SLEEP_MILLIS); + } + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update partitions to " + tableId(databaseName, tableName), e); + } + } + + @Override + public void dropPartitionsToTable(String tableName, List partitionsToDrop) { + throw new UnsupportedOperationException("Not support dropPartitionsToTable yet."); + } + + /** + * Update the table properties to the table. + */ + @Override + public void updateTableProperties(String tableName, Map tableProperties) { + if (nonEmpty(tableProperties)) { + return; + } + try { + updateTableParameters(awsGlue, databaseName, tableName, tableProperties, true); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update properties for table " + tableId(databaseName, tableName), e); + } + } + + @Override + public void updateTableDefinition(String tableName, MessageType newSchema) { + // ToDo Cascade is set in Hive meta sync, but need to investigate how to configure it for Glue meta + boolean cascade = syncConfig.partitionFields.size() > 0; + try { + Table table = getTable(awsGlue, databaseName, tableName); + Map newSchemaMap = parquetSchemaToMapSchema(newSchema, syncConfig.supportTimestamp, false); + List newColumns = newSchemaMap.keySet().stream().map(key -> { + String keyType = getPartitionKeyType(newSchemaMap, key); + return new Column().withName(key).withType(keyType.toLowerCase()).withComment(""); + }).collect(Collectors.toList()); + StorageDescriptor sd = table.getStorageDescriptor(); + sd.setColumns(newColumns); + + final Date now = new Date(); + TableInput updatedTableInput = new TableInput() + .withName(tableName) + .withTableType(table.getTableType()) + .withParameters(table.getParameters()) + .withPartitionKeys(table.getPartitionKeys()) + .withStorageDescriptor(sd) + .withLastAccessTime(now) + .withLastAnalyzedTime(now); + + UpdateTableRequest request = new UpdateTableRequest() + .withDatabaseName(databaseName) + .withTableInput(updatedTableInput); + + awsGlue.updateTable(request); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update definition for table " + tableId(databaseName, tableName), e); + } + } + + @Override + public List getTableCommentUsingMetastoreClient(String tableName) { + throw new UnsupportedOperationException("Not supported: `getTableCommentUsingMetastoreClient`"); + } + + @Override + public void updateTableComments(String tableName, List oldSchema, List newSchema) { + throw new UnsupportedOperationException("Not supported: `updateTableComments`"); + } + + @Override + public void updateTableComments(String tableName, List oldSchema, Map newComments) { + throw new UnsupportedOperationException("Not supported: `updateTableComments`"); + } + + @Override + public void createTable(String tableName, + MessageType storageSchema, + String inputFormatClass, + String outputFormatClass, + String serdeClass, + Map serdeProperties, + Map tableProperties) { + if (tableExists(tableName)) { + return; + } + CreateTableRequest request = new CreateTableRequest(); + Map params = new HashMap<>(); + if (!syncConfig.createManagedTable) { + params.put("EXTERNAL", "TRUE"); + } + params.putAll(tableProperties); + + try { + Map mapSchema = parquetSchemaToMapSchema(storageSchema, syncConfig.supportTimestamp, false); + + List schemaPartitionKeys = new ArrayList<>(); + List schemaWithoutPartitionKeys = new ArrayList<>(); + for (String key : mapSchema.keySet()) { + String keyType = getPartitionKeyType(mapSchema, key); + Column column = new Column().withName(key).withType(keyType.toLowerCase()).withComment(""); + // In Glue, the full schema should exclude the partition keys + if (syncConfig.partitionFields.contains(key)) { + schemaPartitionKeys.add(column); + } else { + schemaWithoutPartitionKeys.add(column); + } + } + + StorageDescriptor storageDescriptor = new StorageDescriptor(); + serdeProperties.put("serialization.format", "1"); + storageDescriptor + .withSerdeInfo(new SerDeInfo().withSerializationLibrary(serdeClass).withParameters(serdeProperties)) + .withLocation(s3aToS3(syncConfig.basePath)) + .withInputFormat(inputFormatClass) + .withOutputFormat(outputFormatClass) + .withColumns(schemaWithoutPartitionKeys); + + final Date now = new Date(); + TableInput tableInput = new TableInput() + .withName(tableName) + .withTableType(TableType.EXTERNAL_TABLE.toString()) + .withParameters(params) + .withPartitionKeys(schemaPartitionKeys) + .withStorageDescriptor(storageDescriptor) + .withLastAccessTime(now) + .withLastAnalyzedTime(now); + request.withDatabaseName(databaseName) + .withTableInput(tableInput); + + CreateTableResult result = awsGlue.createTable(request); + LOG.info("Created table " + tableId(databaseName, tableName) + " : " + result); + } catch (AlreadyExistsException e) { + LOG.warn("Table " + tableId(databaseName, tableName) + " already exists.", e); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to create " + tableId(databaseName, tableName), e); + } + } + + @Override + public Map getTableSchema(String tableName) { + try { + // GlueMetastoreClient returns partition keys separate from Columns, hence get both and merge to + // get the Schema of the table. + Table table = getTable(awsGlue, databaseName, tableName); + Map partitionKeysMap = + table.getPartitionKeys().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase())); + + Map columnsMap = + table.getStorageDescriptor().getColumns().stream().collect(Collectors.toMap(Column::getName, f -> f.getType().toUpperCase())); + + Map schema = new HashMap<>(); + schema.putAll(columnsMap); + schema.putAll(partitionKeysMap); + return schema; + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to get schema for table " + tableId(databaseName, tableName), e); + } + } + + @Override + public boolean doesTableExist(String tableName) { + return tableExists(tableName); + } + + @Override + public boolean tableExists(String tableName) { + GetTableRequest request = new GetTableRequest() + .withDatabaseName(databaseName) + .withName(tableName); + try { + return Objects.nonNull(awsGlue.getTable(request).getTable()); + } catch (EntityNotFoundException e) { + LOG.info("Table not found: " + tableId(databaseName, tableName), e); + return false; + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to get table: " + tableId(databaseName, tableName), e); + } + } + + @Override + public boolean databaseExists(String databaseName) { + GetDatabaseRequest request = new GetDatabaseRequest(); + request.setName(databaseName); + try { + return Objects.nonNull(awsGlue.getDatabase(request).getDatabase()); + } catch (EntityNotFoundException e) { + LOG.info("Database not found: " + databaseName, e); + return false; + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to check if database exists " + databaseName, e); + } + } + + @Override + public void createDatabase(String databaseName) { + if (databaseExists(databaseName)) { + return; + } + CreateDatabaseRequest request = new CreateDatabaseRequest(); + request.setDatabaseInput(new DatabaseInput() + .withName(databaseName) + .withDescription("Automatically created by " + this.getClass().getName()) + .withParameters(null) + .withLocationUri(null)); + try { + CreateDatabaseResult result = awsGlue.createDatabase(request); + LOG.info("Successfully created database in AWS Glue: " + result.toString()); + } catch (AlreadyExistsException e) { + LOG.warn("AWS Glue Database " + databaseName + " already exists", e); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to create database " + databaseName, e); + } + } + + @Override + public Option getLastCommitTimeSynced(String tableName) { + try { + Table table = getTable(awsGlue, databaseName, tableName); + return Option.of(table.getParameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null)); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to get last sync commit time for " + tableId(databaseName, tableName), e); + } + } + + @Override + public void close() { + awsGlue.shutdown(); + } + + @Override + public void updateLastCommitTimeSynced(String tableName) { + if (!activeTimeline.lastInstant().isPresent()) { + LOG.warn("No commit in active timeline."); + return; + } + final String lastCommitTimestamp = activeTimeline.lastInstant().get().getTimestamp(); + try { + updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), false); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); + } + } + + @Override + public Option getLastReplicatedTime(String tableName) { + throw new UnsupportedOperationException("Not supported: `getLastReplicatedTime`"); + } + + @Override + public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { + throw new UnsupportedOperationException("Not supported: `updateLastReplicatedTimeStamp`"); + } + + @Override + public void deleteLastReplicatedTimeStamp(String tableName) { + throw new UnsupportedOperationException("Not supported: `deleteLastReplicatedTimeStamp`"); + } + + private enum TableType { + MANAGED_TABLE, + EXTERNAL_TABLE, + VIRTUAL_VIEW, + INDEX_TABLE, + MATERIALIZED_VIEW + } + + private static Table getTable(AWSGlue awsGlue, String databaseName, String tableName) throws HoodieGlueSyncException { + GetTableRequest request = new GetTableRequest() + .withDatabaseName(databaseName) + .withName(tableName); + try { + return awsGlue.getTable(request).getTable(); + } catch (EntityNotFoundException e) { + throw new HoodieGlueSyncException("Table not found: " + tableId(databaseName, tableName), e); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to get table " + tableId(databaseName, tableName), e); + } + } + + private static void updateTableParameters(AWSGlue awsGlue, String databaseName, String tableName, Map updatingParams, boolean shouldReplace) { + final Map newParams = new HashMap<>(); + try { + Table table = getTable(awsGlue, databaseName, tableName); + if (!shouldReplace) { + newParams.putAll(table.getParameters()); + } + newParams.putAll(updatingParams); + + final Date now = new Date(); + TableInput updatedTableInput = new TableInput() + .withName(tableName) + .withTableType(table.getTableType()) + .withParameters(newParams) + .withPartitionKeys(table.getPartitionKeys()) + .withStorageDescriptor(table.getStorageDescriptor()) + .withLastAccessTime(now) + .withLastAnalyzedTime(now); + + UpdateTableRequest request = new UpdateTableRequest(); + request.withDatabaseName(databaseName) + .withTableInput(updatedTableInput); + awsGlue.updateTable(request); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update params for table " + tableId(databaseName, tableName) + ": " + newParams, e); + } + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java new file mode 100644 index 0000000000000..5a701f12c83c4 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.hive.HiveSyncConfig; +import org.apache.hudi.hive.HiveSyncTool; + +import com.beust.jcommander.JCommander; +import jdk.jfr.Experimental; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; + +/** + * Currently Experimental. Utility class that implements syncing a Hudi Table with the + * AWS Glue Data Catalog (https://docs.aws.amazon.com/glue/latest/dg/populate-data-catalog.html) + * to enable querying via Glue ETLs, Athena etc. + * + * Extends HiveSyncTool since most logic is similar to Hive syncing, + * expect using a different client {@link AWSGlueCatalogSyncClient} that implements + * the necessary functionality using Glue APIs. + */ +@Experimental +public class AwsGlueCatalogSyncTool extends HiveSyncTool { + + public AwsGlueCatalogSyncTool(TypedProperties props, Configuration conf, FileSystem fs) { + super(props, new HiveConf(conf, HiveConf.class), fs); + } + + private AwsGlueCatalogSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) { + super(hiveSyncConfig, hiveConf, fs); + } + + @Override + protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) { + hoodieHiveClient = new AWSGlueCatalogSyncClient(hiveSyncConfig, hiveConf, fs); + } + + public static void main(String[] args) { + // parse the params + final HiveSyncConfig cfg = new HiveSyncConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration()); + HiveConf hiveConf = new HiveConf(); + hiveConf.addResource(fs.getConf()); + new AwsGlueCatalogSyncTool(cfg, hiveConf, fs).syncHoodieTable(); + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/HoodieGlueSyncException.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/HoodieGlueSyncException.java new file mode 100644 index 0000000000000..5b788ebf317ee --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/HoodieGlueSyncException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.aws.sync; + +import org.apache.hudi.hive.HoodieHiveSyncException; + +public class HoodieGlueSyncException extends HoodieHiveSyncException { + + public HoodieGlueSyncException(String message) { + super(message); + } + + public HoodieGlueSyncException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/utils/S3Utils.java b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/S3Utils.java new file mode 100644 index 0000000000000..bfb208ee15058 --- /dev/null +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/utils/S3Utils.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.aws.utils; + +public final class S3Utils { + + public static String s3aToS3(String s3aUrl) { + return s3aUrl.replaceFirst("(?i)^s3a://", "s3://"); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 9741ceef3ede3..56739217216d2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -22,6 +22,7 @@ import java.lang.reflect.Array; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -32,12 +33,21 @@ import java.util.Properties; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; public class CollectionUtils { public static final Properties EMPTY_PROPERTIES = new Properties(); + public static boolean isNullOrEmpty(Collection c) { + return Objects.isNull(c) || c.isEmpty(); + } + + public static boolean nonEmpty(Collection c) { + return !isNullOrEmpty(c); + } + /** * Combines provided arrays into one */ @@ -105,6 +115,21 @@ public static List diff(List one, List another) { return diff; } + public static Stream> batchesAsStream(List list, int batchSize) { + ValidationUtils.checkArgument(batchSize > 0, "batch size must be positive."); + int total = list.size(); + if (total <= 0) { + return Stream.empty(); + } + int numFullBatches = (total - 1) / batchSize; + return IntStream.range(0, numFullBatches + 1).mapToObj( + n -> list.subList(n * batchSize, n == numFullBatches ? total : (n + 1) * batchSize)); + } + + public static List> batches(List list, int batchSize) { + return batchesAsStream(list, batchSize).collect(Collectors.toList()); + } + /** * Determines whether two iterators contain equal elements in the same order. More specifically, * this method returns {@code true} if {@code iterator1} and {@code iterator2} contain the same diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/MapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/MapUtils.java new file mode 100644 index 0000000000000..c39f6fd74f424 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/MapUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.Map; +import java.util.Objects; + +public class MapUtils { + + public static boolean isNullOrEmpty(Map m) { + return Objects.isNull(m) || m.isEmpty(); + } + + public static boolean nonEmpty(Map m) { + return !isNullOrEmpty(m); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCollectionUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCollectionUtils.java new file mode 100644 index 0000000000000..53ca9b2bebc1f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCollectionUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.common.util.CollectionUtils.batches; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TestCollectionUtils { + + @Test + void getBatchesFromList() { + assertThrows(IllegalArgumentException.class, () -> { + batches(Collections.emptyList(), -1); + }); + + assertThrows(IllegalArgumentException.class, () -> { + batches(Collections.emptyList(), 0); + }); + + assertEquals(Collections.emptyList(), batches(Collections.emptyList(), 1)); + + List> intsBatches1 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 3); + assertEquals(2, intsBatches1.size()); + assertEquals(Arrays.asList(1, 2, 3), intsBatches1.get(0)); + assertEquals(Arrays.asList(4, 5, 6), intsBatches1.get(1)); + + List> intsBatches2 = batches(Arrays.asList(1, 2, 3, 4, 5, 6), 5); + assertEquals(2, intsBatches2.size()); + assertEquals(Arrays.asList(1, 2, 3, 4, 5), intsBatches2.get(0)); + assertEquals(Collections.singletonList(6), intsBatches2.get(1)); + } +} diff --git a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java index 65a1d8ae2ddba..934dbadf1c750 100644 --- a/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java +++ b/hudi-kafka-connect/src/main/java/org/apache/hudi/connect/writers/KafkaConnectTransactionServices.java @@ -163,7 +163,9 @@ private void syncMeta() { Arrays.asList(connectConfigs.getMetaSyncClasses().split(","))); FileSystem fs = FSUtils.getFs(tableBasePath, new Configuration()); for (String impl : syncClientToolClasses) { - SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue()); + // TODO kafka connect config needs to support setting base file format + String baseFileFormat = connectConfigs.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT); + SyncUtilHelpers.runHoodieMetaSync(impl.trim(), connectConfigs.getProps(), hadoopConf, fs, tableBasePath, baseFileFormat); } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fc83cebc945d4..f9da1923613e9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -562,6 +562,7 @@ object HoodieSparkSqlWriter { if (metaSyncEnabled) { val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf()) + val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT); val properties = new TypedProperties() properties.putAll(hoodieConfig.getProps) properties.put(HiveSyncConfig.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key, spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString) @@ -572,7 +573,7 @@ object HoodieSparkSqlWriter { hiveConf.addResource(fs.getConf) syncClientToolClassSet.foreach(impl => { - SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue) + SyncUtilHelpers.runHoodieMetaSync(impl.trim, properties, hiveConf, fs, basePath.toString, baseFileFormat) }) } true diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java index 2088d48d8a383..97838d03ed66b 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java @@ -114,7 +114,7 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath() + " of type " + hoodieDLAClient.getTableType()); // Check if the necessary table exists - boolean tableExists = hoodieDLAClient.doesTableExist(tableName); + boolean tableExists = hoodieDLAClient.tableExists(tableName); // Get the parquet schema for this table looking at the latest commit MessageType schema = hoodieDLAClient.getDataSchema(); // Sync schema if needed diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 77d7362fa8166..54192b6a86204 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -18,8 +18,6 @@ package org.apache.hudi.dla; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -31,14 +29,17 @@ import org.apache.hudi.hive.SchemaDifference; import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hudi.sync.common.AbstractSyncHoodieClient; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; import java.io.IOException; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.DatabaseMetaData; +import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -115,7 +116,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu } public Map getTableSchema(String tableName) { - if (!doesTableExist(tableName)) { + if (!tableExists(tableName)) { throw new IllegalArgumentException( "Failed to get schema for table " + tableName + " does not exist"); } @@ -222,6 +223,11 @@ private void updateDLASQL(String sql) { @Override public boolean doesTableExist(String tableName) { + return tableExists(tableName); + } + + @Override + public boolean tableExists(String tableName) { String sql = consutructShowCreateTableSQL(tableName); Statement stmt = null; ResultSet rs = null; @@ -274,6 +280,22 @@ public void updateLastCommitTimeSynced(String tableName) { // TODO : dla do not support update tblproperties, so do nothing. } + @Override + public Option getLastReplicatedTime(String tableName) { + // no op; unsupported + return Option.empty(); + } + + @Override + public void updateLastReplicatedTimeStamp(String tableName, String timeStamp) { + // no op; unsupported + } + + @Override + public void deleteLastReplicatedTimeStamp(String tableName) { + // no op; unsupported + } + @Override public void updatePartitionsToTable(String tableName, List changedPartitions) { if (changedPartitions.isEmpty()) { @@ -370,6 +392,7 @@ public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) } } + @Override public void close() { try { if (connection != null) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java new file mode 100644 index 0000000000000..f0641b6fc08b4 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/AbstractHiveSyncHoodieClient.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.sync.common.AbstractSyncHoodieClient; +import org.apache.hudi.sync.common.HoodieSyncException; +import org.apache.hudi.sync.common.model.Partition; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.parquet.schema.MessageType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Base class to sync Hudi tables with Hive based metastores, such as Hive server, HMS or managed Hive services. + */ +public abstract class AbstractHiveSyncHoodieClient extends AbstractSyncHoodieClient { + + protected final HoodieTimeline activeTimeline; + protected final HiveSyncConfig syncConfig; + protected final Configuration hadoopConf; + protected final PartitionValueExtractor partitionValueExtractor; + + public AbstractHiveSyncHoodieClient(HiveSyncConfig syncConfig, Configuration hadoopConf, FileSystem fs) { + super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, syncConfig.withOperationField, fs); + this.syncConfig = syncConfig; + this.hadoopConf = hadoopConf; + this.partitionValueExtractor = ReflectionUtils.loadClass(syncConfig.partitionValueExtractorClass); + this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + } + + public HoodieTimeline getActiveTimeline() { + return activeTimeline; + } + + /** + * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. + * Generate a list of PartitionEvent based on the changes required. + */ + protected List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDropPartition) { + Map paths = new HashMap<>(); + for (Partition tablePartition : tablePartitions) { + List hivePartitionValues = tablePartition.getValues(); + String fullTablePartitionPath = + Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath(); + paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); + } + + List events = new ArrayList<>(); + for (String storagePartition : partitionStoragePartitions) { + Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition); + String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); + // Check if the partition values or if hdfs path is the same + List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); + + if (isDropPartition) { + events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); + } else { + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } + } + } + } + return events; + } + + /** + * Get all partitions for the table in the metastore. + */ + public abstract List getAllPartitions(String tableName); + + /** + * Check if a database already exists in the metastore. + */ + public abstract boolean databaseExists(String databaseName); + + /** + * Create a database in the metastore. + */ + public abstract void createDatabase(String databaseName); + + /** + * Update schema for the table in the metastore. + */ + public abstract void updateTableDefinition(String tableName, MessageType newSchema); + + /* + * APIs below need to be re-worked by modeling field comment in hudi-sync-common, + * instead of relying on Avro or Hive schema class. + */ + + public Schema getAvroSchemaWithoutMetadataFields() { + try { + return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields(); + } catch (Exception e) { + throw new HoodieSyncException("Failed to read avro schema", e); + } + } + + public abstract List getTableCommentUsingMetastoreClient(String tableName); + + public abstract void updateTableComments(String tableName, List oldSchema, List newSchema); + + public abstract void updateTableComments(String tableName, List oldSchema, Map newComments); + + /* + * APIs above need to be re-worked by modeling field comment in hudi-sync-common, + * instead of relying on Avro or Hive schema class. + */ +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index cac70ab5446e4..b6c4069a21ca8 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -18,14 +18,6 @@ package org.apache.hudi.hive; -import com.beust.jcommander.JCommander; -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Partition; - import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; @@ -41,7 +33,14 @@ import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; +import org.apache.hudi.sync.common.model.Partition; +import com.beust.jcommander.JCommander; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.GroupType; @@ -66,35 +65,33 @@ * partitions incrementally (all the partitions modified since the last commit) */ @SuppressWarnings("WeakerAccess") -public class HiveSyncTool extends AbstractSyncTool { +public class HiveSyncTool extends AbstractSyncTool implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class); public static final String SUFFIX_SNAPSHOT_TABLE = "_rt"; public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro"; - protected final HiveSyncConfig hiveSyncConfig; - protected HoodieHiveClient hoodieHiveClient = null; + protected HiveSyncConfig hiveSyncConfig; + protected AbstractHiveSyncHoodieClient hoodieHiveClient; protected String snapshotTableName = null; protected Option roTableName = null; public HiveSyncTool(TypedProperties props, Configuration conf, FileSystem fs) { - super(props, conf, fs); - this.hiveSyncConfig = new HiveSyncConfig(props); - init(hiveSyncConfig, new HiveConf(conf, HiveConf.class)); + this(new HiveSyncConfig(props), new HiveConf(conf, HiveConf.class), fs); } - @Deprecated public HiveSyncTool(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf, FileSystem fs) { super(hiveSyncConfig.getProps(), hiveConf, fs); - this.hiveSyncConfig = hiveSyncConfig; - init(hiveSyncConfig, hiveConf); + // TODO: reconcile the way to set METASTOREURIS + if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris); + } + initClient(hiveSyncConfig, hiveConf); + initConfig(hiveSyncConfig); } - private void init(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) { + protected void initClient(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) { try { - if (StringUtils.isNullOrEmpty(hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))) { - hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, hiveSyncConfig.metastoreUris); - } this.hoodieHiveClient = new HoodieHiveClient(hiveSyncConfig, hiveConf, fs); } catch (RuntimeException e) { if (hiveSyncConfig.ignoreExceptions) { @@ -103,12 +100,16 @@ private void init(HiveSyncConfig hiveSyncConfig, HiveConf hiveConf) { throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e); } } + } + private void initConfig(HiveSyncConfig hiveSyncConfig) { // Set partitionFields to empty, when the NonPartitionedExtractor is used + // TODO: HiveSyncConfig should be responsible for inferring config value if (NonPartitionedExtractor.class.getName().equals(hiveSyncConfig.partitionValueExtractorClass)) { LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used"); hiveSyncConfig.partitionFields = new ArrayList<>(); } + this.hiveSyncConfig = hiveSyncConfig; if (hoodieHiveClient != null) { switch (hoodieHiveClient.getTableType()) { case COPY_ON_WRITE: @@ -139,9 +140,7 @@ public void syncHoodieTable() { } catch (RuntimeException re) { throw new HoodieException("Got runtime exception when hive syncing " + hiveSyncConfig.tableName, re); } finally { - if (hoodieHiveClient != null) { - hoodieHiveClient.close(); - } + close(); } } @@ -162,6 +161,17 @@ protected void doSync() { } } + @Override + public void close() { + if (hoodieHiveClient != null) { + try { + hoodieHiveClient.close(); + } catch (Exception e) { + throw new HoodieHiveSyncException("Fail to close sync client.", e); + } + } + } + protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, boolean readAsOptimized) { LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieHiveClient.getBasePath() @@ -170,7 +180,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // check if the database exists else create it if (hiveSyncConfig.autoCreateDatabase) { try { - if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) { + if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) { hoodieHiveClient.createDatabase(hiveSyncConfig.databaseName); } } catch (Exception e) { @@ -178,14 +188,14 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.warn("Unable to create database", e); } } else { - if (!hoodieHiveClient.doesDataBaseExist(hiveSyncConfig.databaseName)) { + if (!hoodieHiveClient.databaseExists(hiveSyncConfig.databaseName)) { LOG.error("Hive database does not exist " + hiveSyncConfig.databaseName); throw new HoodieHiveSyncException("hive database does not exist " + hiveSyncConfig.databaseName); } } // Check if the necessary table exists - boolean tableExists = hoodieHiveClient.doesTableExist(tableName); + boolean tableExists = hoodieHiveClient.tableExists(tableName); // check if isDropPartition boolean isDropPartition = hoodieHiveClient.isDropPartition(); @@ -375,7 +385,7 @@ private Map getSparkSerdeProperties(boolean readAsOptimized) { private boolean syncPartitions(String tableName, List writtenPartitionsSince, boolean isDropPartition) { boolean partitionsChanged; try { - List hivePartitions = hoodieHiveClient.scanTablePartitions(tableName); + List hivePartitions = hoodieHiveClient.getAllPartitions(tableName); List partitionEvents = hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 70a88a7aabbd1..a61e7cb6bc0e6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -18,10 +18,7 @@ package org.apache.hudi.hive; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.ImmutablePair; @@ -30,18 +27,14 @@ import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hudi.hive.ddl.HiveSyncMode; import org.apache.hudi.hive.ddl.JDBCExecutor; -import org.apache.hudi.hive.util.HiveSchemaUtil; -import org.apache.hudi.sync.common.AbstractSyncHoodieClient; -import org.apache.hudi.sync.common.HoodieSyncException; +import org.apache.hudi.sync.common.model.Partition; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.log4j.LogManager; @@ -49,7 +42,6 @@ import org.apache.parquet.schema.MessageType; import org.apache.thrift.TException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -57,22 +49,19 @@ import java.util.stream.Collectors; import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP; +import static org.apache.hudi.sync.common.util.TableUtils.tableId; -public class HoodieHiveClient extends AbstractSyncHoodieClient { - - private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync"; - private static final String HIVE_ESCAPE_CHARACTER = HiveSchemaUtil.HIVE_ESCAPE_CHARACTER; +/** + * This class implements logic to sync a Hudi table with either the Hive server or the Hive Metastore. + */ +public class HoodieHiveClient extends AbstractHiveSyncHoodieClient { private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class); - private final PartitionValueExtractor partitionValueExtractor; - private final HoodieTimeline activeTimeline; DDLExecutor ddlExecutor; private IMetaStoreClient client; - private final HiveSyncConfig syncConfig; public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) { - super(cfg.basePath, cfg.assumeDatePartitioning, cfg.useFileListingFromMetadata, cfg.withOperationField, fs); - this.syncConfig = cfg; + super(cfg, configuration, fs); // Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should // disable jdbc and depend on metastore client for all hive registrations @@ -99,20 +88,6 @@ public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem f } catch (Exception e) { throw new HoodieHiveSyncException("Failed to create HiveMetaStoreClient", e); } - - try { - this.partitionValueExtractor = - (PartitionValueExtractor) Class.forName(cfg.partitionValueExtractorClass).newInstance(); - } catch (Exception e) { - throw new HoodieHiveSyncException( - "Failed to initialize PartitionValueExtractor class " + cfg.partitionValueExtractorClass, e); - } - - activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - } - - public HoodieTimeline getActiveTimeline() { - return activeTimeline; } /** @@ -159,61 +134,33 @@ public void updateTableProperties(String tableName, Map tablePro } } - /** - * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. - * Generate a list of PartitionEvent based on the changes required. - */ - List getPartitionEvents(List tablePartitions, List partitionStoragePartitions) { - return getPartitionEvents(tablePartitions, partitionStoragePartitions, false); - } - - /** - * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. - * Generate a list of PartitionEvent based on the changes required. - */ - List getPartitionEvents(List tablePartitions, List partitionStoragePartitions, boolean isDropPartition) { - Map paths = new HashMap<>(); - for (Partition tablePartition : tablePartitions) { - List hivePartitionValues = tablePartition.getValues(); - String fullTablePartitionPath = - Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath(); - paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); - } - - List events = new ArrayList<>(); - for (String storagePartition : partitionStoragePartitions) { - Path storagePartitionPath = FSUtils.getPartitionPath(syncConfig.basePath, storagePartition); - String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); - // Check if the partition values or if hdfs path is the same - List storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - - if (isDropPartition) { - events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); - } else { - if (!storagePartitionValues.isEmpty()) { - String storageValue = String.join(", ", storagePartitionValues); - if (!paths.containsKey(storageValue)) { - events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); - } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { - events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); - } - } - } - } - return events; - } - /** * Scan table partitions. + * + * @deprecated Use {@link #getAllPartitions} instead. */ - public List scanTablePartitions(String tableName) throws TException { + @Deprecated + public List scanTablePartitions(String tableName) throws TException { return client.listPartitions(syncConfig.databaseName, tableName, (short) -1); } - void updateTableDefinition(String tableName, MessageType newSchema) { + @Override + public void updateTableDefinition(String tableName, MessageType newSchema) { ddlExecutor.updateTableDefinition(tableName, newSchema); } + @Override + public List getAllPartitions(String tableName) { + try { + return client.listPartitions(syncConfig.databaseName, tableName, (short) -1) + .stream() + .map(p -> new Partition(p.getValues(), p.getSd().getLocation())) + .collect(Collectors.toList()); + } catch (TException e) { + throw new HoodieHiveSyncException("Failed to get all partitions for table " + tableId(syncConfig.databaseName, tableName), e); + } + } + @Override public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass, @@ -226,18 +173,21 @@ public void createTable(String tableName, MessageType storageSchema, String inpu */ @Override public Map getTableSchema(String tableName) { - if (!doesTableExist(tableName)) { + if (!tableExists(tableName)) { throw new IllegalArgumentException( "Failed to get schema for table " + tableName + " does not exist"); } return ddlExecutor.getTableSchema(tableName); } - /** - * @return true if the configured table exists - */ + @Deprecated @Override public boolean doesTableExist(String tableName) { + return tableExists(tableName); + } + + @Override + public boolean tableExists(String tableName) { try { return client.tableExists(syncConfig.databaseName, tableName); } catch (TException e) { @@ -245,11 +195,13 @@ public boolean doesTableExist(String tableName) { } } - /** - * @param databaseName - * @return true if the configured database exists - */ + @Deprecated public boolean doesDataBaseExist(String databaseName) { + return databaseExists(databaseName); + } + + @Override + public boolean databaseExists(String databaseName) { try { client.getDatabase(databaseName); return true; @@ -261,6 +213,7 @@ public boolean doesDataBaseExist(String databaseName) { } } + @Override public void createDatabase(String databaseName) { ddlExecutor.createDatabase(databaseName); } @@ -321,6 +274,7 @@ public void deleteLastReplicatedTimeStamp(String tableName) { } } + @Override public void close() { try { ddlExecutor.close(); @@ -333,10 +287,6 @@ public void close() { } } - List getAllTables(String db) throws Exception { - return client.getAllTables(db); - } - @Override public void updateLastCommitTimeSynced(String tableName) { // Set the last commit time from the TBLproperties @@ -352,14 +302,7 @@ public void updateLastCommitTimeSynced(String tableName) { } } - public Schema getAvroSchemaWithoutMetadataFields() { - try { - return new TableSchemaResolver(metaClient).getTableAvroSchemaWithoutMetadataFields(); - } catch (Exception e) { - throw new HoodieSyncException("Failed to read avro schema", e); - } - } - + @Override public List getTableCommentUsingMetastoreClient(String tableName) { try { return client.getSchema(syncConfig.databaseName, tableName); @@ -368,11 +311,13 @@ public List getTableCommentUsingMetastoreClient(String tableName) { } } + @Override public void updateTableComments(String tableName, List oldSchema, List newSchema) { Map newComments = newSchema.stream().collect(Collectors.toMap(field -> field.name().toLowerCase(Locale.ROOT), field -> StringUtils.isNullOrEmpty(field.doc()) ? "" : field.doc())); updateTableComments(tableName,oldSchema,newComments); } + @Override public void updateTableComments(String tableName, List oldSchema, Map newComments) { Map oldComments = oldSchema.stream().collect(Collectors.toMap(fieldSchema -> fieldSchema.getName().toLowerCase(Locale.ROOT), fieldSchema -> StringUtils.isNullOrEmpty(fieldSchema.getComment()) ? "" : fieldSchema.getComment())); diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java index d52ac71aa3f16..f4ece02389195 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncException.java @@ -20,10 +20,6 @@ public class HoodieHiveSyncException extends RuntimeException { - public HoodieHiveSyncException() { - super(); - } - public HoodieHiveSyncException(String message) { super(message); } @@ -32,11 +28,4 @@ public HoodieHiveSyncException(String message, Throwable t) { super(message, t); } - public HoodieHiveSyncException(Throwable t) { - super(t); - } - - protected static String format(String message, Object... args) { - return String.format(String.valueOf(message), (Object[]) args); - } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java index 8cab505f1465b..7b22e56d4538c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/ddl/DDLExecutor.java @@ -30,11 +30,12 @@ * There are two main implementations one is QueryBased other is based on HiveMetaStore * QueryBasedDDLExecutor also has two implementations namely HiveQL based and other JDBC based. */ -public interface DDLExecutor { +public interface DDLExecutor extends AutoCloseable { + /** * @param databaseName name of database to be created. */ - public void createDatabase(String databaseName); + void createDatabase(String databaseName); /** * Creates a table with the following properties. @@ -47,9 +48,9 @@ public interface DDLExecutor { * @param serdeProperties * @param tableProperties */ - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, - String outputFormatClass, String serdeClass, - Map serdeProperties, Map tableProperties); + void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties); /** * Updates the table with the newSchema. @@ -57,7 +58,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu * @param tableName * @param newSchema */ - public void updateTableDefinition(String tableName, MessageType newSchema); + void updateTableDefinition(String tableName, MessageType newSchema); /** * Fetches tableSchema for a table. @@ -65,7 +66,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu * @param tableName * @return */ - public Map getTableSchema(String tableName); + Map getTableSchema(String tableName); /** * Adds partition to table. @@ -73,7 +74,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu * @param tableName * @param partitionsToAdd */ - public void addPartitionsToTable(String tableName, List partitionsToAdd); + void addPartitionsToTable(String tableName, List partitionsToAdd); /** * Updates partitions for a given table. @@ -81,7 +82,7 @@ public void createTable(String tableName, MessageType storageSchema, String inpu * @param tableName * @param changedPartitions */ - public void updatePartitionsToTable(String tableName, List changedPartitions); + void updatePartitionsToTable(String tableName, List changedPartitions); /** * Drop partitions for a given table. @@ -89,15 +90,13 @@ public void createTable(String tableName, MessageType storageSchema, String inpu * @param tableName * @param partitionsToDrop */ - public void dropPartitionsToTable(String tableName, List partitionsToDrop); + void dropPartitionsToTable(String tableName, List partitionsToDrop); /** * update table comments * * @param tableName - * @param newSchema + * @param newSchema Map key: field name, Map value: [field type, field comment] */ - public void updateTableComments(String tableName, Map> newSchema); - - public void close(); + void updateTableComments(String tableName, Map> newSchema); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java index 51b2a77ae7433..a7d205962e25c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java @@ -55,10 +55,6 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, LOG.info("Sync complete for " + tableName); } - public void close() { - hoodieHiveClient.close(); - } - public Map> getLastReplicatedTimeStampMap() { Map> timeStampMap = new HashMap<>(); Option timeStamp = hoodieHiveClient.getLastReplicatedTime(snapshotTableName); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 0e23615d5dadd..1c2d53ed96ded 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -138,12 +138,12 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 1, @@ -176,9 +176,9 @@ public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); - List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME); + List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); @@ -211,20 +211,20 @@ public void testSyncDataBase(String syncMode) throws Exception { hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "true"); reinitHiveSyncClient(); assertDoesNotThrow((this::reSyncHiveTable)); - assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME), + assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME), "DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes"); // while autoCreateDatabase is false and database exists; hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "false"); reinitHiveSyncClient(); assertDoesNotThrow((this::reSyncHiveTable)); - assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME), + assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME), "DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes"); // while autoCreateDatabase is true and database exists; hiveSyncProps.setProperty(HiveSyncConfig.HIVE_AUTO_CREATE_DATABASE.key(), "true"); assertDoesNotThrow((this::reSyncHiveTable)); - assertTrue(hiveClient.doesDataBaseExist(HiveTestUtil.DB_NAME), + assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME), "DataBases " + HiveTestUtil.DB_NAME + " should exist after sync completes"); } @@ -457,8 +457,8 @@ public void testSyncIncremental(String syncMode) throws Exception { reSyncHiveTable(); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); - List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -581,11 +581,11 @@ public void testSyncMergeOnRead(boolean useSchemaFromCommitMetadata, String sync String roTableName = HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(roTableName), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); + assertFalse(hiveClient.tableExists(roTableName), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes"); + assertTrue(hiveClient.tableExists(roTableName), "Table " + roTableName + " should exist after sync completes"); if (useSchemaFromCommitMetadata) { assertEquals(hiveClient.getTableSchema(roTableName).size(), @@ -643,14 +643,14 @@ public void testSyncMergeOnReadRT(boolean useSchemaFromCommitMetadata, String sy String snapshotTableName = HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(snapshotTableName), + assertFalse(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(snapshotTableName), + assertTrue(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should exist after sync completes"); @@ -713,11 +713,11 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 3, @@ -736,8 +736,8 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { reinitHiveSyncClient(); List writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime)); assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit"); - List hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.TABLE_NAME); - List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + List hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + List partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, false); assertEquals(1, partitionEvents.size(), "There should be only one partition event"); assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD"); @@ -755,7 +755,7 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception { reinitHiveSyncClient(); reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 3, @@ -776,12 +776,12 @@ public void testDropPartitionKeySync(String syncMode) throws Exception { HiveTestUtil.createCOWTable(instantTime, 1, true); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 1, @@ -820,11 +820,11 @@ public void testDropPartition(String syncMode) throws Exception { HiveTestUtil.createCOWTable(instantTime, 1, true); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 1, @@ -860,11 +860,11 @@ public void testNonPartitionedSync(String syncMode) throws Exception { HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." + HiveTestUtil.TABLE_NAME); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size(), @@ -882,13 +882,13 @@ public void testReadSchemaForMOR(String syncMode) throws Exception { HiveTestUtil.createMORTable(commitTime, "", 5, false, true); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + assertFalse(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should not exist initially"); // Lets do the sync reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + assertTrue(hiveClient.tableExists(snapshotTableName), "Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + " should exist after sync completes"); // Schema being read from compacted base files @@ -925,7 +925,7 @@ public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxE HiveTestUtil.createCOWTable(instantTime, 5, false); reinitHiveSyncClient(); HoodieHiveClient prevHiveClient = hiveClient; - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); // Lets do the sync @@ -936,12 +936,12 @@ public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxE reSyncHiveTable(); assertNull(hiveClient); - assertFalse(prevHiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), + assertFalse(prevHiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); } private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception { - assertTrue(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); + assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should exist after sync completes"); assertEquals(hiveClient.getTableSchema(HiveTestUtil.TABLE_NAME).size(), hiveClient.getDataSchema().getColumns().size() + 1, "Hive Schema should match the table schema + partition field"); @@ -973,7 +973,7 @@ public void testPickingOlderParquetFileIfLatestIsEmptyCommit(String syncMode) th final String emptyCommitTime = "200"; HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, true); reinitHiveSyncClient(); - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); reinitHiveSyncClient(); reSyncHiveTable(); @@ -1000,7 +1000,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(String syncM reinitHiveSyncClient(); assertFalse( - hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); + hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); HiveSyncTool tool = new HiveSyncTool(hiveSyncProps, getHiveConf(), fileSystem); // now delete the evolved commit instant @@ -1017,7 +1017,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(String syncM } // table should not be synced yet - assertFalse(hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist at all"); + assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist at all"); } @ParameterizedTest @@ -1033,7 +1033,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTa //HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime); reinitHiveSyncClient(); assertFalse( - hiveClient.doesTableExist(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); + hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " + HiveTestUtil.TABLE_NAME + " should not exist initially"); reSyncHiveTable(); @@ -1120,7 +1120,7 @@ public void testSyncWithoutDiffs(String syncMode) throws Exception { reinitHiveSyncClient(); reSyncHiveTable(); - assertTrue(hiveClient.doesTableExist(tableName)); + assertTrue(hiveClient.tableExists(tableName)); assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2); @@ -1138,7 +1138,7 @@ private void reSyncHiveTable() { private void reinitHiveSyncClient() { hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf(), fileSystem); - hiveClient = hiveSyncTool.hoodieHiveClient; + hiveClient = (HoodieHiveClient) hiveSyncTool.hoodieHiveClient; } private int getPartitionFieldSize() { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 1815491f1867e..33fc204788b2c 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -18,8 +18,6 @@ package org.apache.hudi.sync.common; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -31,6 +29,9 @@ import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.MessageType; @@ -43,10 +44,11 @@ import java.util.Map; import java.util.Objects; -public abstract class AbstractSyncHoodieClient { +public abstract class AbstractSyncHoodieClient implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class); + public static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync"; public static final TypeConverter TYPE_CONVERTOR = new TypeConverter() {}; protected final HoodieTableMetaClient metaClient; @@ -89,12 +91,24 @@ public abstract void createTable(String tableName, MessageType storageSchema, String serdeClass, Map serdeProperties, Map tableProperties); + /** + * @deprecated Use {@link #tableExists} instead. + */ + @Deprecated public abstract boolean doesTableExist(String tableName); + public abstract boolean tableExists(String tableName); + public abstract Option getLastCommitTimeSynced(String tableName); public abstract void updateLastCommitTimeSynced(String tableName); + public abstract Option getLastReplicatedTime(String tableName); + + public abstract void updateLastReplicatedTimeStamp(String tableName, String timeStamp); + + public abstract void deleteLastReplicatedTimeStamp(String tableName); + public abstract void addPartitionsToTable(String tableName, List partitionsToAdd); public abstract void updatePartitionsToTable(String tableName, List changedPartitions); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java new file mode 100644 index 0000000000000..8e2076f95cb9f --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/model/Partition.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.common.model; + +import java.util.List; + +public class Partition { + + private final List values; + + private final String storageLocation; + + public Partition(List values, String storageLocation) { + this.values = values; + this.storageLocation = storageLocation; + } + + public List getValues() { + return values; + } + + public String getStorageLocation() { + return storageLocation; + } +} diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/TableUtils.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/TableUtils.java new file mode 100644 index 0000000000000..d392bb64184f2 --- /dev/null +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/TableUtils.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.common.util; + +public final class TableUtils { + + public static String tableId(String database, String table) { + return String.format("%s.%s", database, table); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java index f6ea5c0f55485..d6837a384aa0d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java @@ -167,7 +167,7 @@ public void testPuller() throws IOException, URISyntaxException { puller.saveDelta(); HoodieHiveClient assertingClient = new HoodieHiveClient(new HiveSyncConfig(getAssertionSyncConfig(cfg.tmpDb)), HiveTestUtil.getHiveConf(), fileSystem); String tmpTable = cfg.targetTable + "__" + cfg.sourceTable; - assertTrue(assertingClient.doesTableExist(tmpTable)); + assertTrue(assertingClient.tableExists(tmpTable)); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 18cc787fcee21..62b862e4b86a5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -1240,8 +1240,8 @@ public void testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() t HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, "hive_trips"); hiveSyncConfig.partitionFields = CollectionUtils.createImmutableList("year", "month", "day"); HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs); - assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist"); - assertEquals(3, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(), + assertTrue(hiveClient.tableExists(hiveSyncConfig.tableName), "Table " + hiveSyncConfig.tableName + " should exist"); + assertEquals(3, hiveClient.getAllPartitions(hiveSyncConfig.tableName).size(), "Table partitions should match the number of partitions we wrote"); assertEquals(lastInstantForUpstreamTable, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java index 8464740bf2bf0..cc93fe497563f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java @@ -201,7 +201,7 @@ protected static HiveSyncConfig getHiveSyncConfig(String basePath, String tableN * * @throws IOException */ - private static void clearHiveDb() throws IOException { + private static void clearHiveDb() throws Exception { HiveConf hiveConf = new HiveConf(); // Create Dummy hive sync config HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");