Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.exception.TableNotFoundException;

Expand Down Expand Up @@ -106,10 +105,13 @@ public String createTable(
throw new IllegalStateException("Table already existing in path : " + path);
}

final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr);
HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, archiveFolder,
payloadClass, layoutVersion);

HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableTypeStr)
.setTableName(name)
.setArchiveLogFolder(archiveFolder)
.setPayloadClassName(payloadClass)
.setTimelineLayoutVersion(layoutVersion)
.initTable(HoodieCLI.conf, path);
// Now connect to ensure loading works
return connect(path, layoutVersion, false, 0, 0, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,14 @@ private void bootstrapFromFilesystem(HoodieEngineContext engineContext, HoodieTa
String createInstantTime = latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
LOG.info("Creating a new metadata table in " + metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);

HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataWriteConfig.getBasePath(),
HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(),
HoodieFileFormat.HFILE.toString());
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(tableName)
.setArchiveLogFolder("archived")
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());

initTableMetadata();

// List all partitions in the basePath of the containing dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,11 @@ private void testUpsertsInternal(HoodieWriteConfig config,
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withProps(config.getProps()).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);

// Write 1 (only inserts)
Expand Down Expand Up @@ -493,10 +495,11 @@ private void testHoodieConcatHandle(HoodieWriteConfig config, boolean isPrepped)
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
VERSION_0).build();
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(), metaClient.getTableType(),
metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);

// Write 1 (only inserts)
Expand Down Expand Up @@ -629,9 +632,11 @@ private void testUpsertsUpdatePartitionPath(IndexType indexType, HoodieWriteConf
.withBloomIndexUpdatePartitionPath(true)
.withGlobalSimpleIndexUpdatePartitionPath(true)
.build()).withTimelineLayoutVersion(VERSION_0).build();
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
metaClient.getTableType(), metaClient.getTableConfig().getTableName(), metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);

HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_0)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
// Set rollback to LAZY so no inflights are deleted
hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP,
HoodieFailedWritesCleaningPolicy.LAZY.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
@Test
public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem
HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(hadoopConf, dfsBasePath);

// Create write client to write some records in
HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
Expand All @@ -100,8 +103,11 @@ public void readLocalWriteHDFS() throws Exception {
assertEquals(readRecords.count(), records.size(), "Should contain 100 records");

// Write to local
HoodieTableMetaClient.initTableType(hadoopConf, tablePath, HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(hadoopConf, tablePath);

String writeCommitTime = localWriteClient.startCommit();
LOG.info("Starting write commit " + writeCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ public void testMORTable() throws Exception {
tableType = HoodieTableType.MERGE_ON_READ;

// Create the table
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
HoodieTableType.MERGE_ON_READ, metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1);
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
Expand Down Expand Up @@ -295,9 +297,10 @@ public void testMORTable() throws Exception {
@Test
public void testCopyOnWriteTable() throws Exception {
// Create the table
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), metaClient.getBasePath(),
HoodieTableType.COPY_ON_WRITE, metaClient.getTableConfig().getTableName(),
metaClient.getArchivePath(), metaClient.getTableConfig().getPayloadClass(), VERSION_1);
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getHadoopConf(), metaClient.getBasePath());

HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -117,10 +116,13 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin

@Override
public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString());
props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name());
props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
props = HoodieTableMetaClient.withPropertyBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
.setTableType(COPY_ON_WRITE)
.setPayloadClass(HoodieAvroPayload.class)
.setBaseFileFormat(PARQUET.toString())
.fromProperties(props)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
}

Expand Down
Loading