Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hudi-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ object DataSourceWriteOptions {
*/
val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class"

// HIVE SYNC SPECIFIC CONFIGS
//NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hudi

import java.util
import java.util.Properties

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
Expand All @@ -30,9 +31,11 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecordPayload
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -289,6 +292,24 @@ private[hudi] object HoodieSparkSqlWriter {
} else {
true
}
val clientImpls = parameters.get(SYNC_CLIENT_TOOL_CLASS).getOrElse("")
log.info(s"impls is $clientImpls")
if (!clientImpls.isEmpty) {
val impls = clientImpls.split(",")
impls.foreach(impl => {
if (!impl.trim.contains(classOf[HiveSyncTool].getName)) {
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
val properties = new Properties();
properties.putAll(parameters)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
true
} else {
log.warn("please use hoodie.datasource.hive_sync.enable to sync to hive")
}
})
}
client.close()
commitSuccess && syncHiveSucess
} else {
Expand Down
7 changes: 7 additions & 0 deletions hudi-hive-sync/pom.xml → hudi-sync/hudi-hive-sync/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.6.0-SNAPSHOT</version>
<relativePath>../../hudi</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-hive-sync</artifactId>
Expand All @@ -43,6 +45,11 @@
<artifactId>hudi-hadoop-mr</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.InvalidTableException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.util.HiveSchemaUtil;

import com.beust.jcommander.JCommander;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
Expand All @@ -49,7 +50,7 @@
* partitions incrementally (all the partitions modified since the last commit)
*/
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool {
public class HiveSyncTool extends AbstractSyncTool {

private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
Expand All @@ -61,6 +62,7 @@ public class HiveSyncTool {
private final Option<String> roTableTableName;

public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
Expand All @@ -84,6 +86,7 @@ public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
}
}

@Override
public void syncHoodieTable() {
try {
switch (hoodieHiveClient.getTableType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.util.HiveSchemaUtil;

import org.apache.hadoop.fs.FileSystem;
Expand All @@ -43,6 +38,7 @@
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
Expand All @@ -62,7 +58,7 @@
import java.util.Map;
import java.util.stream.Collectors;

public class HoodieHiveClient {
public class HoodieHiveClient extends AbstractSyncHoodieClient {

private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
// Make sure we have the hive JDBC driver in classpath
Expand All @@ -78,8 +74,6 @@ public class HoodieHiveClient {
}

private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
private final HoodieTableMetaClient metaClient;
private final HoodieTableType tableType;
private final PartitionValueExtractor partitionValueExtractor;
private IMetaStoreClient client;
private HiveSyncConfig syncConfig;
Expand All @@ -89,10 +83,9 @@ public class HoodieHiveClient {
private HiveConf configuration;

public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(cfg.basePath, cfg.assumeDatePartitioning, fs);
this.syncConfig = cfg;
this.fs = fs;
this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true);
this.tableType = metaClient.getTableType();

this.configuration = configuration;
// Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
Expand Down Expand Up @@ -125,7 +118,8 @@ public HoodieTimeline getActiveTimeline() {
/**
* Add the (NEW) partitions to the table.
*/
void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
@Override
public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
LOG.info("No partitions to add for " + tableName);
return;
Expand All @@ -138,7 +132,8 @@ void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
/**
* Partition path has changed - update the path for te following partitions.
*/
void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
@Override
public void updatePartitionsToTable(String tableName, List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
LOG.info("No partitions to change for " + tableName);
return;
Expand Down Expand Up @@ -258,7 +253,8 @@ void updateTableDefinition(String tableName, MessageType newSchema) {
}
}

void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
@Override
public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
try {
String createSQLQuery =
HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
Expand All @@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat
/**
* Get the table schema.
*/
//???? overwrite
public Map<String, String> getTableSchema(String tableName) {
if (syncConfig.useJdbc) {
if (!doesTableExist(tableName)) {
Expand Down Expand Up @@ -327,24 +324,10 @@ public Map<String, String> getTableSchemaUsingMetastoreClient(String tableName)
}
}

/**
* Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
* present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
* not changed within a single atomic write.
*
* @return Parquet schema for this table
*/
public MessageType getDataSchema() {
try {
return new TableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to read data schema", e);
}
}

/**
* @return true if the configured table exists
*/
@Override
public boolean doesTableExist(String tableName) {
try {
return client.tableExists(syncConfig.databaseName, tableName);
Expand Down Expand Up @@ -455,36 +438,7 @@ private String getHiveJdbcUrlWithDefaultDBName() {
return hiveJdbcUrl + (urlAppend == null ? "" : urlAppend);
}

private static void closeQuietly(ResultSet resultSet, Statement stmt) {
try {
if (stmt != null) {
stmt.close();
}
} catch (SQLException e) {
LOG.error("Could not close the statement opened ", e);
}

try {
if (resultSet != null) {
resultSet.close();
}
} catch (SQLException e) {
LOG.error("Could not close the resultset opened ", e);
}
}

public String getBasePath() {
return metaClient.getBasePath();
}

HoodieTableType getTableType() {
return tableType;
}

public FileSystem getFs() {
return fs;
}

@Override
public Option<String> getLastCommitTimeSynced(String tableName) {
// Get the last commit time from the TBLproperties
try {
Expand All @@ -509,33 +463,12 @@ public void close() {
}
}

List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
try {
return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning);
} catch (IOException e) {
throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e);
}
} else {
LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");

HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
return timelineToSync.getInstants().map(s -> {
try {
return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
}
}).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
}
}

List<String> getAllTables(String db) throws Exception {
return client.getAllTables(db);
}

void updateLastCommitTimeSynced(String tableName) {
@Override
public void updateLastCommitTimeSynced(String tableName) {
// Set the last commit time from the TBLproperties
String lastCommitSynced = activeTimeline.lastInstant().get().getTimestamp();
try {
Expand All @@ -546,30 +479,4 @@ void updateLastCommitTimeSynced(String tableName) {
throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);
}
}

/**
* Partition Event captures any partition that needs to be added or updated.
*/
static class PartitionEvent {

public enum PartitionEventType {
ADD, UPDATE
}

PartitionEventType eventType;
String storagePartition;

PartitionEvent(PartitionEventType eventType, String storagePartition) {
this.eventType = eventType;
this.storagePartition = storagePartition;
}

static PartitionEvent newPartitionAddEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.ADD, storagePartition);
}

static PartitionEvent newPartitionUpdateEvent(String storagePartition) {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.parquet.schema.MessageType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;


/**
* Represents the schema difference between the storage schema and hive table schema.
Expand Down Expand Up @@ -91,7 +93,7 @@ public Builder(MessageType storageSchema, Map<String, String> tableSchema) {
this.tableSchema = tableSchema;
deleteColumns = new ArrayList<>();
updateColumnTypes = new HashMap<>();
addColumnTypes = new HashMap<>();
addColumnTypes = new LinkedHashMap<>();
}

public Builder deleteTableColumn(String column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;

Expand Down
Loading