Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/demo/setup_demo_container.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ cp /var/hoodie/ws/docker/demo/config/log4j.properties $SPARK_CONF_DIR/.
hadoop fs -mkdir -p /var/demo/
hadoop fs -mkdir -p /tmp/spark-events
hadoop fs -copyFromLocal -f /var/hoodie/ws/docker/demo/config /var/demo/.
chmod +x /var/hoodie/ws/hudi-hive-sync/run_sync_tool.sh
chmod +x /var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh
5 changes: 5 additions & 0 deletions hudi-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi

import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
import org.apache.hudi.hive.HiveSyncTool
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
import org.apache.hudi.keygen.SimpleKeyGenerator
import org.apache.log4j.LogManager
Expand Down Expand Up @@ -251,11 +252,14 @@ object DataSourceWriteOptions {
*/
val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
val META_SYNC_CLIENT_TOOL_CLASS = "hoodie.meta.sync.client.tool.class"
val DEFAULT_META_SYNC_CLIENT_TOOL_CLASS = classOf[HiveSyncTool].getName

// HIVE SYNC SPECIFIC CONFIGS
//NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
// unexpected issues with config getting reset
val HIVE_SYNC_ENABLED_OPT_KEY = "hoodie.datasource.hive_sync.enable"
val META_SYNC_ENABLED_OPT_KEY = "hoodie.datasource.meta.sync.enable"
val HIVE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.database"
val HIVE_TABLE_OPT_KEY = "hoodie.datasource.hive_sync.table"
val HIVE_BASE_FILE_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.base_file_format"
Expand All @@ -270,6 +274,7 @@ object DataSourceWriteOptions {

// DEFAULT FOR HIVE SPECIFIC CONFIGS
val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false"
val DEFAULT_META_SYNC_ENABLED_OPT_VAL = "false"
val DEFAULT_HIVE_DATABASE_OPT_VAL = "default"
val DEFAULT_HIVE_TABLE_OPT_VAL = "unknown"
val DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL = "PARQUET"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hudi

import java.util
import java.util.Properties

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
Expand All @@ -32,9 +33,11 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.log4j.LogManager
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -240,7 +243,10 @@ private[hudi] object HoodieSparkSqlWriter {
STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
META_SYNC_CLIENT_TOOL_CLASS -> DEFAULT_META_SYNC_CLIENT_TOOL_CLASS,
//just for backwards compatiblity
HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
META_SYNC_ENABLED_OPT_KEY -> DEFAULT_META_SYNC_ENABLED_OPT_VAL,
HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
Expand Down Expand Up @@ -287,6 +293,44 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig
}

private def metaSync(parameters: Map[String, String],
basePath: Path,
hadoopConf: Configuration): Boolean = {
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
var syncClientToolClassSet = scala.collection.mutable.Set[String]()
parameters(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass)

// for backward compatibility
if (hiveSyncEnabled) {
metaSyncEnabled = true
syncClientToolClassSet += classOf[HiveSyncTool].getName
}
var metaSyncSuccess = true
if (metaSyncEnabled) {
val fs = basePath.getFileSystem(hadoopConf)
syncClientToolClassSet.foreach(impl => {
val syncSuccess = impl.trim match {
case "org.apache.hudi.hive.HiveSyncTool" => {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
syncHive(basePath, fs, parameters)
true
}
case _ => {
val properties = new Properties();
properties.putAll(parameters)
properties.put("basePath", basePath.toString)
val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
syncHoodie.syncHoodieTable()
true
}
}
metaSyncSuccess = metaSyncSuccess && syncSuccess
})
}
metaSyncSuccess
}

private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
parameters: Map[String, String],
client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
Expand Down Expand Up @@ -323,20 +367,13 @@ private[hudi] object HoodieSparkSqlWriter {
}

log.info(s"Compaction Scheduled is $compactionInstant")
val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
val syncHiveSucess = if (hiveSyncEnabled) {
log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
syncHive(basePath, fs, parameters)
} else {
true
}
val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration())

log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
if (!asyncCompactionEnabled) {
client.close()
}
(commitSuccess && syncHiveSucess, compactionInstant)
(commitSuccess && metaSyncSuccess, compactionInstant)
} else {
log.error(s"$operation failed with $errorCount errors :")
if (log.isTraceEnabled) {
Expand Down
171 changes: 171 additions & 0 deletions hudi-sync/hudi-dla-sync/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hudi</artifactId>
<groupId>org.apache.hudi</groupId>
<version>0.6.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>

<artifactId>hudi-dla-sync</artifactId>
<packaging>jar</packaging>

<properties>
<hadoop.aliyun.version>3.2.1</hadoop.aliyun.version>
<mysql.connector.java.version>5.1.47</mysql.connector.java.version>
<aliyun.sdk.oss.version>3.1.0</aliyun.sdk.oss.version>
<aliyun.java.sdk.core.version>3.7.1</aliyun.java.sdk.core.version>
</properties>

<dependencies>
<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-sync-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>

<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.java.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aliyun</artifactId>
<version>${hadoop.aliyun.version}</version>
</dependency>

<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>${aliyun.sdk.oss.version}</version>
</dependency>

<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>${aliyun.java.sdk.core.version}</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>

<!-- Hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Loading