Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/demo/presto-batch1.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
2 changes: 2 additions & 0 deletions docker/demo/presto-batch2-after-compaction.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
select symbol, max(ts) from stock_ticks_mor group by symbol HAVING symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor where symbol = 'GOOG';
1 change: 1 addition & 0 deletions docker/demo/presto-table-check.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
show tables;
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public abstract class ITTestBase {
protected static final String ADHOC_1_CONTAINER = "/adhoc-1";
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HUDI_HADOOP_BUNDLE =
Expand All @@ -63,6 +64,7 @@ public abstract class ITTestBase {
protected static final String HUDI_UTILITIES_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090";
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";

// Skip these lines when capturing output from hive
Expand Down Expand Up @@ -106,6 +108,14 @@ static String getSparkShellCommand(String commandFile) {
.append(" --packages com.databricks:spark-avro_2.11:4.0.0 ").append(" -i ").append(commandFile).toString();
}

static String getPrestoConsoleCommand(String commandFile) {
StringBuilder builder = new StringBuilder().append("presto --server " + PRESTO_COORDINATOR_URL)
.append(" --catalog hive --schema default")
.append(" -f " + commandFile );
System.out.println("Presto comamnd " + builder.toString());
return builder.toString();
}

@Before
public void init() {
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
Expand Down Expand Up @@ -207,6 +217,20 @@ Pair<String, String> executeSparkSQLCommand(String commandFile, boolean expected
return Pair.of(callback.getStdout().toString(), callback.getStderr().toString());
}

Pair<String, String> executePrestoCommandFile(String commandFile) throws Exception {
String prestoCmd = getPrestoConsoleCommand(commandFile);
TestExecStartResultCallback callback = executeCommandStringInDocker(PRESTO_COORDINATOR, prestoCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}

void executePrestoCopyCommand(String fromFile, String remotePath){
Container sparkWorkerContainer = runningContainers.get(PRESTO_COORDINATOR);
dockerClient.copyArchiveToContainerCmd(sparkWorkerContainer.getId())
.withHostResource(fromFile)
.withRemotePath(remotePath)
.exec();
}

private void saveUpLogs() {
try {
// save up the Hive log files for introspection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ public class ITTestHoodieDemo extends ITTestBase {
private static String HDFS_DATA_DIR = "/usr/hive/data/input";
private static String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/" + "batch_1.json";
private static String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/" + "batch_2.json";
private static String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + "presto-table-check.commands";
private static String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + "presto-batch1.commands";
private static String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + "presto-batch2-after-compaction.commands";

private static String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
private static String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
private static String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
private static String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
private static String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";

private static String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
private static String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
Expand All @@ -55,7 +61,6 @@ public class ITTestHoodieDemo extends ITTestBase {
private static String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
private static String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";


private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync " + " --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
+ " --hoodie-conf hoodie.datasource.hive_sync.username=hive "
Expand All @@ -64,26 +69,28 @@ public class ITTestHoodieDemo extends ITTestBase {
+ " --hoodie-conf hoodie.datasource.hive_sync.database=default "
+ " --hoodie-conf hoodie.datasource.hive_sync.table=%s";


@Test
public void testDemo() throws Exception {
setupDemo();

// batch 1
ingestFirstBatchAndHiveSync();
testHiveAfterFirstBatch();
testPrestoAfterFirstBatch();
testSparkSQLAfterFirstBatch();

// batch 2
ingestSecondBatchAndHiveSync();
testHiveAfterSecondBatch();
testPrestoAfterSecondBatch();
testSparkSQLAfterSecondBatch();
testIncrementalHiveQuery();
testIncrementalSparkSQLQuery();

// compaction
scheduleAndRunCompaction();
testHiveAfterSecondBatchAfterCompaction();
testPrestoAfterSecondBatchAfterCompaction();
testIncrementalHiveQueryAfterCompaction();
}

Expand All @@ -94,6 +101,16 @@ private void setupDemo() throws Exception {
.add("hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1)
.add("/bin/bash " + DEMO_CONTAINER_SCRIPT).build();
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);

// create input dir in presto coordinator
cmds = new ImmutableList.Builder<String>()
.add("mkdir -p " + HDFS_DATA_DIR).build();
executeCommandStringsInDocker(PRESTO_COORDINATOR, cmds);

// copy presto sql files to presto coordinator
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand( System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
}

private void ingestFirstBatchAndHiveSync() throws Exception {
Expand Down Expand Up @@ -168,6 +185,20 @@ private void ingestSecondBatchAndHiveSync() throws Exception {
executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
}

private void testPrestoAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_TABLE_CHECK_PATH);
assertStdOutContains(stdOutErrPair, "stock_ticks_cow");
assertStdOutContains(stdOutErrPair, "stock_ticks_mor",2);

stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 4);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
}

private void testHiveAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n"
Expand All @@ -187,6 +218,20 @@ private void testHiveAfterSecondBatch() throws Exception {
2);
}

private void testPrestoAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"",2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}

private void testHiveAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH2_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n" + "+---------+----------------------+\n"
Expand All @@ -199,6 +244,16 @@ private void testHiveAfterSecondBatchAfterCompaction() throws Exception {
2);
}

private void testPrestoAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executePrestoCommandFile(HDFS_PRESTO_INPUT_BATCH2_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}

private void testSparkSQLAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
assertStdOutContains(stdOutErrPair,
Expand Down