Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,25 @@ public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAd
return latestSchema;
}


/**
* Get Last commit's Metadata.
*/
public Option<HoodieCommitMetadata> getLatestCommitMetadata() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we have a helper for this somewhere?

try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
HoodieInstant instant = timeline.lastInstant().get();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} else {
return Option.empty();
}
} catch (Exception e) {
throw new HoodieException("Failed to get commit metadata", e);
}
}

/**
* Read the parquet schema from a parquet File.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@

package org.apache.spark.sql.hudi.command

import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.util.PartitionPathEncodeUtils
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkSqlWriter}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
import org.apache.spark.sql.hudi.HoodieSqlUtils._
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}

case class AlterHoodieTableDropPartitionCommand(
tableIdentifier: TableIdentifier,
Expand Down Expand Up @@ -67,7 +68,8 @@ extends RunnableCommand {
val allPartitionPaths = hoodieCatalogTable.getAllPartitionPaths
val enableHiveStylePartitioning = isHiveStyledPartitioning(allPartitionPaths, table)
val enableEncodeUrl = isUrlEncodeEnabled(allPartitionPaths, table)
val partitionsToDelete = normalizedSpecs.map { spec =>
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
val partitionsToDrop = normalizedSpecs.map { spec =>
hoodieCatalogTable.partitionFields.map{ partitionColumn =>
val encodedPartitionValue = if (enableEncodeUrl) {
PartitionPathEncodeUtils.escapePathName(spec(partitionColumn))
Expand All @@ -82,16 +84,26 @@ extends RunnableCommand {
}.mkString("/")
}.mkString(",")

val enableHive = isEnableHive(sparkSession)
withSparkConf(sparkSession, Map.empty) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName,
OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL,
PARTITIONS_TO_DELETE.key -> partitionsToDelete,
PARTITIONS_TO_DELETE.key -> partitionsToDrop,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""),
PARTITIONPATH_FIELD.key -> hoodieCatalogTable.partitionFields.mkString(",")
PARTITIONPATH_FIELD.key -> partitionFields,
HIVE_SYNC_ENABLED.key -> enableHive.toString,
META_SYNC_ENABLED.key -> enableHive.toString,
HIVE_SYNC_MODE.key -> HiveSyncMode.HMS.name(),
HIVE_USE_JDBC.key -> "false",
HIVE_DATABASE.key -> hoodieCatalogTable.table.identifier.database.getOrElse("default"),
HIVE_TABLE.key -> hoodieCatalogTable.table.identifier.table,
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_PARTITION_FIELDS.key -> partitionFields,
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[MultiPartKeysValueExtractor].getCanonicalName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this hardcoding of configs okay? is the user still able to override these with properties supplied at the SQL statement itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this hardcoding of configs okay? is the user still able to override these with properties supplied at the SQL statement itself?

Regarding this configuration, I need a pr to handle it uniformly. As you said, if spark sql has hive support enabled, you can replace this information from the sql configuration without hard configuration.

)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
}
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just dropPartitions()?

throw new UnsupportedOperationException("Not support dropPartitionsToTable yet.");
}

public Map<List<String>, String> scanTablePartitions(String tableName) {
String sql = constructShowPartitionSQL(tableName);
Statement stmt = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.hive.util.Parquet2SparkSchemaUtils;

import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncTool;
Expand Down Expand Up @@ -166,20 +165,28 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist(tableName);

// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();

// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
// check if isDropPartition
boolean isDropPartition = hoodieHiveClient.isDropPartition();

// check if schemaChanged
boolean schemaChanged = false;

if (!isDropPartition) {
// Get the parquet schema for this table looking at the latest commit
MessageType schema = hoodieHiveClient.getDataSchema();

// Currently HoodieBootstrapRelation does support reading bootstrap MOR rt table,
// so we disable the syncAsSparkDataSourceTable here to avoid read such kind table
// by the data source way (which will use the HoodieBootstrapRelation).
// TODO after we support bootstrap MOR rt table in HoodieBootstrapRelation[HUDI-2071], we can remove this logical.
if (hoodieHiveClient.isBootstrap()
&& hoodieHiveClient.getTableType() == HoodieTableType.MERGE_ON_READ
&& !readAsOptimized) {
cfg.syncAsSparkDataSourceTable = false;
}
// Sync schema if needed
schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);
}
// Sync schema if needed
boolean schemaChanged = syncSchema(tableName, tableExists, useRealtimeInputFormat, readAsOptimized, schema);

LOG.info("Schema sync complete. Syncing partitions for " + tableName);
// Get the last time we successfully synced partitions
Expand All @@ -192,7 +199,7 @@ protected void syncHoodieTable(String tableName, boolean useRealtimeInputFormat,
LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());

// Sync the partitions if needed
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince);
boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, isDropPartition);
boolean meetSyncConditions = schemaChanged || partitionsChanged;
if (!cfg.isConditionalSync || meetSyncConditions) {
hoodieHiveClient.updateLastCommitTimeSynced(tableName);
Expand Down Expand Up @@ -331,19 +338,32 @@ private Map<String, String> getSparkSerdeProperties(boolean readAsOptimized) {
* Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the
* partition path does not match, it updates the partition path).
*/
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince) {
private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, boolean isDropPartition) {
boolean partitionsChanged;
try {
List<Partition> hivePartitions = hoodieHiveClient.scanTablePartitions(tableName);
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, isDropPartition);

List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
if (!newPartitions.isEmpty()) {
LOG.info("New Partitions " + newPartitions);
hoodieHiveClient.addPartitionsToTable(tableName, newPartitions);
}

List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty();
if (!updatePartitions.isEmpty()) {
LOG.info("Changed Partitions " + updatePartitions);
hoodieHiveClient.updatePartitionsToTable(tableName, updatePartitions);
}

List<String> dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP);
if (!dropPartitions.isEmpty()) {
LOG.info("Drop Partitions " + dropPartitions);
hoodieHiveClient.dropPartitionsToTable(tableName, dropPartitions);
}

partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
ddlExecutor.updatePartitionsToTable(tableName, changedPartitions);
}

/**
* Partition path has changed - drop the following partitions.
*/
@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
ddlExecutor.dropPartitionsToTable(tableName, partitionsToDrop);
}

/**
* Update the table properties to the table.
*/
Expand All @@ -147,6 +155,14 @@ public void updateTableProperties(String tableName, Map<String, String> tablePro
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions) {
return getPartitionEvents(tablePartitions, partitionStoragePartitions, false);
}

/**
* Iterate over the storage partitions and find if there are any new partitions that need to be added or updated.
* Generate a list of PartitionEvent based on the changes required.
*/
List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, boolean isDropPartition) {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
Expand All @@ -161,12 +177,17 @@ List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<St
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));

if (isDropPartition) {
events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
} else {
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
} else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,13 @@ public void createTable(String tableName, MessageType storageSchema, String inpu
*/
public void updatePartitionsToTable(String tableName, List<String> changedPartitions);

/**
* Drop partitions for a given table.
*
* @param tableName
* @param partitionsToDrop
*/
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop);

public void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,25 @@ public void updatePartitionsToTable(String tableName, List<String> changedPartit
}
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
LOG.info("No partitions to drop for " + tableName);
return;
}

LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
try {
for (String dropPartition : partitionsToDrop) {
client.dropPartition(syncConfig.databaseName, tableName, dropPartition, false);
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (TException e) {
LOG.error(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
throw new HoodieHiveSyncException(syncConfig.databaseName + "." + tableName + " drop partition failed", e);
}
}

@Override
public void close() {
if (client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,25 @@ public Map<String, String> getTableSchema(String tableName) {
}
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
if (partitionsToDrop.isEmpty()) {
LOG.info("No partitions to drop for " + tableName);
return;
}

LOG.info("Drop partitions " + partitionsToDrop.size() + " on " + tableName);
try {
for (String dropPartition : partitionsToDrop) {
metaStoreClient.dropPartition(config.databaseName, tableName, dropPartition, false);
LOG.info("Drop partition " + dropPartition + " on " + tableName);
}
} catch (Exception e) {
LOG.error(config.databaseName + "." + tableName + " drop partition failed", e);
throw new HoodieHiveSyncException(config.databaseName + "." + tableName + " drop partition failed", e);
}
}

@Override
public void close() {
if (metaStoreClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -141,6 +142,13 @@ public Map<String, String> getTableSchema(String tableName) {
}
}

@Override
public void dropPartitionsToTable(String tableName, List<String> partitionsToDrop) {
partitionsToDrop.stream()
.map(partition -> String.format("ALTER TABLE `%s` DROP PARTITION (%s)", tableName, partition))
.forEach(this::runSQL);
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,56 @@ public void testMultiPartitionKeySync(String syncMode) throws Exception {
assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}

@ParameterizedTest
@MethodSource("syncMode")
public void testDropPartitionKeySync(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 1, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
// session, then lead to connection retry, we can see there is a exception at log.
hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");

// Adding of new partitions
List<String> newPartition = Arrays.asList("2050/01/01");
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"No new partition should be added");
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition);
assertEquals(2, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"New partition should be added");

tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();

// Drop 1 partition.
ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName
+ "` DROP PARTITION (`datestr`='2050-01-01')");

hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
assertEquals(1, hivePartitions.size(),
"Table should have 1 partition because of the drop 1 partition");
}

@ParameterizedTest
@MethodSource("syncMode")
public void testNonPartitionedSync(String syncMode) throws Exception {
Expand Down
Loading