Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b
DataSourceWriteOptions.DEFAULT_HIVE_USE_JDBC_OPT_VAL()));
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY()));
hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY()));
hiveSyncConfig.skipROSuffix = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SKIP_RO_SUFFIX(),
DataSourceWriteOptions.DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL()));
hiveSyncConfig.supportTimestamp = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_SUPPORT_TIMESTAMP(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ object DataSourceWriteOptions {
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = "hoodie.datasource.hive_sync.use_pre_apache_input_format"
val HIVE_USE_JDBC_OPT_KEY = "hoodie.datasource.hive_sync.use_jdbc"
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"
val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "hoodie.datasource.hive_sync.ignore_exceptions"
val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix"
val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp"

Expand All @@ -365,6 +366,7 @@ object DataSourceWriteOptions {
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "true"
val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false"
val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false"
val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.useFileListingFromMetadata = parameters(HoodieMetadataConfig.METADATA_ENABLE_PROP).toBoolean
hiveSyncConfig.verifyMetadataFileListing = parameters(HoodieMetadataConfig.METADATA_VALIDATE_PROP).toBoolean
hiveSyncConfig.ignoreExceptions = parameters.get(HIVE_IGNORE_EXCEPTIONS_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean)
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--auto-create-database"}, description = "Auto create hive database")
public Boolean autoCreateDatabase = true;

@Parameter(names = {"--ignore-exceptions"}, description = "Ignore hive exceptions")
public Boolean ignoreExceptions = false;

@Parameter(names = {"--skip-ro-suffix"}, description = "Skip the `_ro` suffix for Read optimized table, when registering")
public Boolean skipROSuffix = false;

Expand Down Expand Up @@ -130,6 +133,7 @@ public String toString() {
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", autoCreateDatabase=" + autoCreateDatabase
+ ", ignoreExceptions=" + ignoreExceptions
+ ", skipROSuffix=" + skipROSuffix
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,56 +58,72 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";

private final HiveSyncConfig cfg;
private final HoodieHiveClient hoodieHiveClient;
private final String snapshotTableName;
private final Option<String> roTableTableName;
private HoodieHiveClient hoodieHiveClient = null;
private String snapshotTableName = null;
private Option<String> roTableName = null;

public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
super(configuration.getAllProperties(), fs);
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);

try {
this.hoodieHiveClient = new HoodieHiveClient(cfg, configuration, fs);
} catch (RuntimeException e) {
if (cfg.ignoreExceptions) {
LOG.error("Got runtime exception when hive syncing, but continuing as ignoreExceptions config is set ", e);
} else {
throw new HoodieHiveSyncException("Got runtime exception when hive syncing", e);
}
}

this.cfg = cfg;
// Set partitionFields to empty, when the NonPartitionedExtractor is used
if (NonPartitionedExtractor.class.getName().equals(cfg.partitionValueExtractorClass)) {
LOG.warn("Set partitionFields to empty, since the NonPartitionedExtractor is used");
cfg.partitionFields = new ArrayList<>();
}
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
this.snapshotTableName = cfg.tableName;
this.roTableTableName = Option.empty();
break;
case MERGE_ON_READ:
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}

@Override
public void syncHoodieTable() {
try {
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false);
this.snapshotTableName = cfg.tableName;
this.roTableName = Option.empty();
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableTableName.get(), false);
// sync a RT table for MOR
syncHoodieTable(snapshotTableName, true);
this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
this.roTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
}

@Override
public void syncHoodieTable() {
try {
if (hoodieHiveClient != null) {
switch (hoodieHiveClient.getTableType()) {
case COPY_ON_WRITE:
syncHoodieTable(snapshotTableName, false);
break;
case MERGE_ON_READ:
// sync a RO table for MOR
syncHoodieTable(roTableName.get(), false);
// sync a RT table for MOR
syncHoodieTable(snapshotTableName, true);
break;
default:
LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
throw new InvalidTableException(hoodieHiveClient.getBasePath());
}
}
} catch (RuntimeException re) {
throw new HoodieException("Got runtime exception when hive syncing " + cfg.tableName, re);
} finally {
hoodieHiveClient.close();
if (hoodieHiveClient != null) {
hoodieHiveClient.close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -613,4 +614,25 @@ public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
"The last commit that was sycned should be 103");
}

@Test
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException {
HiveTestUtil.hiveSyncConfig.useJdbc = true;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, false);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync

HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
syncToolConfig.ignoreExceptions = true;
syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031");
HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
}

}