Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.common.table;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -35,12 +40,6 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
Expand Down Expand Up @@ -85,7 +84,8 @@ private MessageType getTableParquetSchemaFromDataFile() throws Exception {
// If this is COW, get the last commit and read the schema from a file written in the
// last commit
HoodieInstant lastCommit =
activeTimeline.getCommitsTimeline().filterCompletedInstants().lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
activeTimeline.getCommitsTimeline().filterCompletedInstantsWithCommitMetadata()
.lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath()));
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class);
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
Expand All @@ -97,8 +97,8 @@ private MessageType getTableParquetSchemaFromDataFile() throws Exception {
// If this is MOR, depending on whether the latest commit is a delta commit or
// compaction commit
// Get a datafile written and get the schema from that file
Option<HoodieInstant> lastCompactionCommit =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
Option<HoodieInstant> lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline()
.filterCompletedInstantsWithCommitMetadata().lastInstant();
LOG.info("Found the last compaction commit as " + lastCompactionCommit);

Option<HoodieInstant> lastDeltaCommit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.common.table.timeline;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -100,6 +102,12 @@ public HoodieTimeline filterCompletedInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details);
}

@Override
public HoodieTimeline filterCompletedInstantsWithCommitMetadata() {
return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted)
.filter(i -> !isDeletePartitionType(i)), details);
}

@Override
public HoodieTimeline filterCompletedAndCompactionInstants() {
return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted()
Expand Down Expand Up @@ -351,6 +359,21 @@ public Option<byte[]> getInstantDetails(HoodieInstant instant) {
return details.apply(instant);
}

@Override
public boolean isDeletePartitionType(HoodieInstant instant) {
Option<WriteOperationType> operationType;

try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class);
operationType = Option.of(commitMetadata.getOperationType());
} catch (Exception e) {
operationType = Option.empty();
}

return operationType.isPresent() && WriteOperationType.DELETE_PARTITION.equals(operationType.get());
}

@Override
public String toString() {
return this.getClass().getName() + ": " + instants.stream().map(Object::toString).collect(Collectors.joining(","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline filterCompletedAndCompactionInstants();

/**
* Filter this timeline to include the completed and exclude operation type is delete partition instants.
*
* @return New instance of HoodieTimeline with include the completed and
* exclude operation type is delete partition instants
*/
HoodieTimeline filterCompletedInstantsWithCommitMetadata();

/**
* Timeline to just include commits (commit/deltacommit), compaction and replace actions.
*
Expand Down Expand Up @@ -281,6 +289,11 @@ public interface HoodieTimeline extends Serializable {
*/
Option<byte[]> getInstantDetails(HoodieInstant instant);

/**
* Check WriteOperationType is DeletePartition.
*/
boolean isDeletePartitionType(HoodieInstant instant);

/**
* Helper methods to compare instants.
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.hudi.hive;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieTableType;
Expand All @@ -31,12 +36,6 @@
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.GroupType;
Expand Down Expand Up @@ -168,26 +167,22 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
// check if isDropPartition
boolean isDropPartition = hoodieHiveClient.isDropPartition();

// check if schemaChanged
boolean schemaChanged = false;

if (!isDropPartition) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This judgment is the reason that the manual execution of HiveSyncTool does not update the schema.

// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();

// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();

// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
}

// Sync schema if needed
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);

LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
Expand Down