diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index d1a4d9663754..0508c1bc4a81 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -18,6 +18,11 @@ package org.apache.hudi.common.table; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.SchemaCompatibility; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -35,12 +40,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.SchemaCompatibility; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroSchemaConverter; @@ -85,7 +84,8 @@ private MessageType getTableParquetSchemaFromDataFile() throws Exception { // If this is COW, get the last commit and read the schema from a file written in the // last commit HoodieInstant lastCommit = - activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); + activeTimeline.getCommitsTimeline().filterCompletedInstantsWithCommitMetadata() + .lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() @@ -97,8 +97,8 @@ private MessageType getTableParquetSchemaFromDataFile() throws Exception { // If this is MOR, depending on whether the latest commit is a delta commit or // compaction commit // Get a datafile written and get the schema from that file - Option lastCompactionCommit = - metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + Option lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline() + .filterCompletedInstantsWithCommitMetadata().lastInstant(); LOG.info("Found the last compaction commit as " + lastCompactionCommit); Option lastDeltaCommit; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index cfd77d12494a..aa48db79e7fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.table.timeline; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; @@ -100,6 +102,12 @@ public HoodieTimeline filterCompletedInstants() { return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details); } + @Override + public HoodieTimeline filterCompletedInstantsWithCommitMetadata() { + return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted) + .filter(i -> !isDeletePartitionType(i)), details); + } + @Override public HoodieTimeline filterCompletedAndCompactionInstants() { return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted() @@ -351,6 +359,21 @@ public Option getInstantDetails(HoodieInstant instant) { return details.apply(instant); } + @Override + public boolean isDeletePartitionType(HoodieInstant instant) { + Option operationType; + + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class); + operationType = Option.of(commitMetadata.getOperationType()); + } catch (Exception e) { + operationType = Option.empty(); + } + + return operationType.isPresent() && WriteOperationType.DELETE_PARTITION.equals(operationType.get()); + } + @Override public String toString() { return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(",")); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index d5d322384c19..bb6bed8280c6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -131,6 +131,14 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterCompletedAndCompactionInstants(); + /** + * Filter this timeline to include the completed and exclude operation type is delete partition instants. + * + * @return New instance of HoodieTimeline with include the completed and + * exclude operation type is delete partition instants + */ + HoodieTimeline filterCompletedInstantsWithCommitMetadata(); + /** * Timeline to just include commits (commit/deltacommit), compaction and replace actions. * @@ -281,6 +289,11 @@ public interface HoodieTimeline extends Serializable { */ Option getInstantDetails(HoodieInstant instant); + /** + * Check WriteOperationType is DeletePartition. + */ + boolean isDeletePartitionType(HoodieInstant instant); + /** * Helper methods to compare instants. **/ diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index f07ab88c071e..f8b015714f95 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -18,6 +18,11 @@ package org.apache.hudi.hive; +import com.beust.jcommander.JCommander; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; @@ -31,12 +36,6 @@ import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.sync.common.AbstractSyncTool; - -import com.beust.jcommander.JCommander; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.parquet.schema.GroupType; @@ -168,26 +167,22 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat, // check if isDropPartition boolean isDropPartition = hoodieHiveClient.isDropPartition(); - // check if schemaChanged - boolean schemaChanged = false; - - if (!isDropPartition) { - // Get the parquet schema for this table looking at the latest commit - MessageType schema = hoodieHiveClient.getDataSchema(); - - // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, - // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table - // by the data source way (which will use the HoodieBootstrapRelation). - // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. - if (hoodieHiveClient.isBootstrap() - && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ - && !readAsOptimized) { - cfg.syncAsSparkDataSourceTable = false; - } - // Sync schema if needed - schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); + // Get the parquet schema for this table looking at the latest commit + MessageType schema = hoodieHiveClient.getDataSchema(); + + // Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table, + // so we disable the syncAsSparkDataSourceTable here to avoid read such kind table + // by the data source way (which will use the HoodieBootstrapRelation). + // TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical. + if (hoodieHiveClient.isBootstrap() + && hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ + && !readAsOptimized) { + cfg.syncAsSparkDataSourceTable = false; } + // Sync schema if needed + boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema); + LOG.info("Schema sync complete. Syncing partitions for " + tableName); // Get the last time we successfully synced partitions Option lastCommitTimeSynced = Option.empty();