Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.stream.Stream;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.testutils.HiveTestService.HS2_JDBC_URL;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
Expand Down Expand Up @@ -180,7 +181,7 @@ private static TypedProperties getProperties() {
// Make path selection test suite specific
props.setProperty("hoodie.deltastreamer.source.input.selector", DFSTestSuitePathSelector.class.getName());
// Hive Configs
props.setProperty(HIVE_URL.key(), "jdbc:hive2://127.0.0.1:9999/");
props.setProperty(HIVE_URL.key(), HS2_JDBC_URL);
props.setProperty(META_SYNC_DATABASE_NAME.key(), "testdb1");
props.setProperty(META_SYNC_TABLE_NAME.key(), "table1");
props.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public HiveSyncConfig(Properties props) {

public HiveSyncConfig(Properties props, Configuration hadoopConf) {
super(props, hadoopConf);
HiveConf hiveConf = new HiveConf(hadoopConf, HiveConf.class);
HiveConf hiveConf = hadoopConf instanceof HiveConf
? (HiveConf) hadoopConf : new HiveConf(hadoopConf, HiveConf.class);
// HiveConf needs to load fs conf to allow instantiation via AWSGlueClientFactory
hiveConf.addResource(getHadoopFileSystem().getConf());
setHadoopConf(hiveConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,12 @@
public class TestHiveSyncTool {

private static final List<Object> SYNC_MODES = Arrays.asList(
"hiveql",
"hms",
"jdbc");

private static Iterable<Object> syncMode() {
return SYNC_MODES; // TODO include hiveql; skipped due to CI issue
return SYNC_MODES;
}

// useSchemaFromCommitMetadata, syncMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.hive.replication;

import org.apache.hudi.hive.testutils.TestCluster;
import org.apache.hudi.hive.testutils.HiveTestCluster;

import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -51,9 +51,9 @@
public class TestHiveSyncGlobalCommitTool {

@RegisterExtension
public static TestCluster localCluster = new TestCluster();
public static HiveTestCluster localCluster = new HiveTestCluster();
@RegisterExtension
public static TestCluster remoteCluster = new TestCluster();
public static HiveTestCluster remoteCluster = new HiveTestCluster();

private static final String DB_NAME = "foo";
private static final String TBL_NAME = "bar";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.hive.testutils;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
Expand All @@ -30,6 +29,7 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.FileIOUtils;
Expand All @@ -39,10 +39,10 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
Expand All @@ -58,14 +58,14 @@
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.runners.model.InitializationError;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
Expand All @@ -76,16 +76,14 @@

import static org.junit.jupiter.api.Assertions.fail;

public class TestCluster implements BeforeAllCallback, AfterAllCallback,
BeforeEachCallback, AfterEachCallback {
private HdfsTestService hdfsTestService;
public HiveTestService hiveTestService;
private Configuration conf;
public HiveServer2 server2;
private static volatile int port = 9083;
public class HiveTestCluster implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
public MiniDFSCluster dfsCluster;
DateTimeFormatter dtfOut;
public File hiveSiteXml;
private HdfsTestService hdfsTestService;
private HiveTestService hiveTestService;
private HiveConf conf;
private HiveServer2 server2;
private DateTimeFormatter dtfOut;
private File hiveSiteXml;
private IMetaStoreClient client;

@Override
Expand All @@ -110,24 +108,19 @@ public void setup() throws Exception {
hdfsTestService = new HdfsTestService();
dfsCluster = hdfsTestService.start(true);

conf = hdfsTestService.getHadoopConf();
conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++);
conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++);
conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
hiveTestService = new HiveTestService(conf);
Configuration hadoopConf = hdfsTestService.getHadoopConf();
hadoopConf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, NetworkTestUtils.nextFreePort());
hiveTestService = new HiveTestService(hadoopConf);
server2 = hiveTestService.start();
dtfOut = DateTimeFormatter.ofPattern("yyyy/MM/dd");
hiveSiteXml = File.createTempFile("hive-site", ".xml");
hiveSiteXml.deleteOnExit();
conf = hiveTestService.getHiveConf();
try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
hiveTestService.getServerConf().writeXml(os);
conf.writeXml(os);
}
client = HiveMetaStoreClient.newSynchronizedClient(
RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), true));
}

public Configuration getConf() {
return this.conf;
RetryingMetaStoreClient.getProxy(conf, true));
}

public String getHiveSiteXmlLocation() {
Expand All @@ -139,7 +132,7 @@ public IMetaStoreClient getHMSClient() {
}

public String getHiveJdBcUrl() {
return "jdbc:hive2://127.0.0.1:" + conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + "";
return hiveTestService.getJdbcHive2Url();
}

public String tablePath(String dbName, String tableName) throws Exception {
Expand All @@ -152,12 +145,12 @@ private String dbPath(String dbName) throws Exception {

public void forceCreateDb(String dbName) throws Exception {
try {
getHMSClient().dropDatabase(dbName);
} catch (NoSuchObjectException e) {
System.out.println("db does not exist but its ok " + dbName);
client.dropDatabase(dbName);
} catch (NoSuchObjectException ignored) {
// expected
}
Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
getHMSClient().createDatabase(db);
client.createDatabase(db);
}

public void createCOWTable(String commitTime, int numberOfPartitions, String dbName, String tableName)
Expand All @@ -170,10 +163,7 @@ public void createCOWTable(String commitTime, int numberOfPartitions, String dbN
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(conf, path.toString());
boolean result = dfsCluster.getFileSystem().mkdirs(path);
if (!result) {
throw new InitializationError("cannot initialize table");
}
dfsCluster.getFileSystem().mkdirs(path);
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime, path.toString());
createCommitFile(commitMetadata, commitTime, path.toString());
Expand Down Expand Up @@ -240,7 +230,7 @@ private void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
try {
writer.write(s);
} catch (IOException e) {
fail("IOException while writing test records as parquet" + e.toString());
fail("IOException while writing test records as parquet", e);
}
});
writer.close();
Expand All @@ -260,15 +250,15 @@ public void stopHiveServer2() {
public void startHiveServer2() {
if (server2 == null) {
server2 = new HiveServer2();
server2.init(hiveTestService.getServerConf());
server2.init(hiveTestService.getHiveConf());
server2.start();
}
}

public void shutDown() throws IOException {
stopHiveServer2();
Files.deleteIfExists(hiveSiteXml.toPath());
Hive.closeCurrent();
hiveTestService.getHiveMetaStore().stop();
hiveTestService.stop();
hdfsTestService.stop();
FileSystem.closeAll();
}
Expand Down
Loading