Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(String syncM

// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime, hiveSyncConfig.basePath);

HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti
clear();
}

public static void clearIncrementalPullSetup(String path1, String path2) throws IOException, HiveException, MetaException {
Comment thread
pratyakshsharma marked this conversation as resolved.
fileSystem.delete(new Path(path1), true);
if (path2 != null) {
fileSystem.delete(new Path(path2), true);
}
clear();
}

public static void clear() throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
Expand Down Expand Up @@ -157,23 +165,28 @@ public static void shutdown() {
}
}

public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata,
String basePath, String databaseName, String tableName) throws IOException, URISyntaxException {
Path path = new Path(basePath);
FileIOUtils.deleteDirectory(new File(basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, basePath);

boolean result = fileSystem.mkdirs(path);
checkResult(result);
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
createdTablesSet.add(databaseName + "." + tableName);
createCommitFile(commitMetadata, instantTime, basePath);
}

public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, hiveSyncConfig.basePath, hiveSyncConfig.databaseName, hiveSyncConfig.tableName);
}

public static void createCOWTableWithSchema(String instantTime, String schemaFileName)
Expand Down Expand Up @@ -207,7 +220,7 @@ public static void createCOWTableWithSchema(String instantTime, String schemaFil
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString());
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
}

public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
Expand All @@ -225,7 +238,7 @@ public static void createMORTable(String commitTime, String deltaCommitTime, int
checkResult(result);
ZonedDateTime dateTime = ZonedDateTime.now();
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
useSchemaFromCommitMetadata, dateTime, commitTime);
useSchemaFromCommitMetadata, dateTime, commitTime, hiveSyncConfig.basePath);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet
Expand All @@ -247,24 +260,24 @@ public static void createMORTable(String commitTime, String deltaCommitTime, int
public static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
}

public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
}

public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
useSchemaFromCommitMetadata, startFrom, instantTime, hiveSyncConfig.basePath);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
Expand Down Expand Up @@ -299,13 +312,13 @@ private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteS
}

private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime) throws IOException, URISyntaxException {
boolean useSchemaFromCommitMetadata, ZonedDateTime startFrom, String instantTime, String basePath) throws IOException, URISyntaxException {
startFrom = startFrom.truncatedTo(ChronoUnit.DAYS);

HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (int i = 0; i < numberOfPartitions; i++) {
String partitionPath = startFrom.format(dtfOut);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
Path partPath = new Path(basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
Expand Down Expand Up @@ -369,7 +382,7 @@ private static void generateParquetData(Path filePath, boolean isParquetSchemaSi
}

private static void generateParquetDataWithSchema(Path filePath, Schema schema)
throws IOException, URISyntaxException {
throws IOException {
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
Expand Down Expand Up @@ -433,9 +446,9 @@ private static void checkResult(boolean result) {
}
}

public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException {
public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime, String basePath) throws IOException {
byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(bytes);
Expand All @@ -444,7 +457,7 @@ public static void createCommitFile(HoodieCommitMetadata commitMetadata, String

public static void createCommitFileWithSchema(HoodieCommitMetadata commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
createCommitFile(commitMetadata, instantTime);
createCommitFile(commitMetadata, instantTime, hiveSyncConfig.basePath);
}

private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime)
Expand Down
6 changes: 6 additions & 0 deletions hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@
<groupId>org.antlr</groupId>
<artifactId>stringtemplate</artifactId>
<version>4.0.2</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@

/**
* Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary
* table.
* table. This temporary table can be further read using {@link org.apache.hudi.utilities.sources.HiveIncrPullSource} and the changes can
* be applied to the target table.
* <p>
* Current Limitations:
* <p>
Expand Down Expand Up @@ -146,7 +147,7 @@ public void saveDelta() throws IOException {
String tempDbTable = config.tmpDb + "." + config.targetTable + "__" + config.sourceTable;
String tempDbTablePath =
config.hoodieTmpDir + "/" + config.targetTable + "__" + config.sourceTable + "/" + lastCommitTime;
executeStatement("drop table " + tempDbTable, stmt);
executeStatement("drop table if exists " + tempDbTable, stmt);
deleteHDFSPath(fs, tempDbTablePath);
if (!ensureTempPathExists(fs, lastCommitTime)) {
throw new IllegalStateException("Could not create target path at "
Expand Down Expand Up @@ -185,12 +186,12 @@ private void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, S
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
}
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) {
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
LOG.error("Incremental SQL : " + incrementalSQL
+ " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add "
+ " does not contain `_hoodie_commit_time` > '%s'. Please add "
+ "this clause for incremental to work properly.");
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which "
"Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which "
+ "means its not pulling incrementally");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,45 @@

package org.apache.hudi.utilities;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Instant;

import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem;
import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncConfig;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestHiveIncrementalPuller {

private HiveIncrementalPuller.Config config;
private String targetBasePath = null;

@BeforeEach
public void setup() {
public void setup() throws HiveException, IOException, InterruptedException, MetaException {
config = new HiveIncrementalPuller.Config();
HiveTestUtil.setUp();
}

@AfterEach
public void teardown() throws Exception {
HiveTestUtil.clearIncrementalPullSetup(config.hoodieTmpDir, targetBasePath);
}

@Test
Expand All @@ -41,4 +68,104 @@ public void testInitHiveIncrementalPuller() {

}

private HiveIncrementalPuller.Config getHivePullerConfig(String incrementalSql) throws IOException {
config.hiveJDBCUrl = hiveSyncConfig.jdbcUrl;
config.hiveUsername = hiveSyncConfig.hiveUser;
config.hivePassword = hiveSyncConfig.hivePass;
config.hoodieTmpDir = Files.createTempDirectory("hivePullerTest").toUri().toString();
config.sourceDb = hiveSyncConfig.databaseName;
config.sourceTable = hiveSyncConfig.tableName;
config.targetDb = "tgtdb";
config.targetTable = "test2";
config.tmpDb = "tmp_db";
config.fromCommitTime = "100";
createIncrementalSqlFile(incrementalSql, config);
return config;
}

private void createIncrementalSqlFile(String text, HiveIncrementalPuller.Config cfg) throws IOException {
java.nio.file.Path path = Paths.get(cfg.hoodieTmpDir + "/incremental_pull.txt");
Files.createDirectories(path.getParent());
Files.createFile(path);
try (FileWriter fr = new FileWriter(new File(path.toUri()))) {
fr.write(text);
} catch (Exception e) {
// no-op
}
cfg.incrementalSQLFile = path.toString();
}

private void createSourceTable() throws IOException, URISyntaxException {
String instantTime = "101";
HiveTestUtil.createCOWTable(instantTime, 5, true);
hiveSyncConfig.syncMode = "jdbc";
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
}

private void createTargetTable() throws IOException, URISyntaxException {
String instantTime = "100";
targetBasePath = Files.createTempDirectory("hivesynctest1" + Instant.now().toEpochMilli()).toUri().toString();
HiveTestUtil.createCOWTable(instantTime, 5, true,
targetBasePath, "tgtdb", "test2");
HiveSyncTool tool = new HiveSyncTool(getTargetHiveSyncConfig(targetBasePath), HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
}

private HiveSyncConfig getTargetHiveSyncConfig(String basePath) {
HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig);
config.databaseName = "tgtdb";
config.tableName = "test2";
config.basePath = basePath;
config.batchSyncNum = 3;
config.syncMode = "jdbc";
return config;
}

private HiveSyncConfig getAssertionSyncConfig(String databaseName) {
HiveSyncConfig config = HiveSyncConfig.copy(hiveSyncConfig);
config.databaseName = databaseName;
return config;
}

private void createTables() throws IOException, URISyntaxException {
createSourceTable();
createTargetTable();
}

@Test
public void testPullerWithoutIncrementalClause() throws IOException, URISyntaxException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these tests only seem to be testing some failure scenarios? not the happy path?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting this comment :)

I am actually facing some error with the happy flow test case. Once I am able to fix it, will add it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pratyakshsharma Is the patch ready? If not, can you please update the happy flow test case even if it's failing. I can take over and try to fix it.

createTables();
HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig(
"select name from testdb.test1"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta,
"Should fail when incremental clause not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have clause `_hoodie_commit_time` > '%s', which means its not pulling incrementally"));
}

@Test
public void testPullerWithoutSourceInSql() throws IOException, URISyntaxException {
createTables();
HiveIncrementalPuller puller = new HiveIncrementalPuller(getHivePullerConfig(
"select name from tgtdb.test2 where `_hoodie_commit_time` > '%s'"));
Exception e = assertThrows(HoodieIncrementalPullSQLException.class, puller::saveDelta,
"Should fail when source db and table names not provided!");
assertTrue(e.getMessage().contains("Incremental SQL does not have testdb.test1"));
}

@Test
public void testPuller() throws IOException, URISyntaxException {
createTables();
HiveIncrementalPuller.Config cfg = getHivePullerConfig("select name from testdb.test1 where `_hoodie_commit_time` > '%s'");
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
hiveClient.createDatabase(cfg.tmpDb);
HiveIncrementalPuller puller = new HiveIncrementalPuller(cfg);
puller.saveDelta();
HiveSyncConfig assertingConfig = getAssertionSyncConfig(cfg.tmpDb);
HoodieHiveClient assertingClient = new HoodieHiveClient(assertingConfig, HiveTestUtil.getHiveConf(), fileSystem);
String tmpTable = cfg.targetTable + "__" + cfg.sourceTable;
assertTrue(assertingClient.doesTableExist(tmpTable));
}

}