Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
Expand Down Expand Up @@ -1425,17 +1427,38 @@ public HoodieMetrics getMetrics() {
}

/**
* Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
* bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
* Performs necessary bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped).
*
* NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
* <p>NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary);
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
if (initialMetadataTableIfNecessary) {
initMetadataTable(instantTime);
}
} finally {
this.txnManager.endTransaction(ownerInstant);
}
}

/**
* Bootstrap the metadata table.
*
* @param instantTime current inflight instant time
*/
protected void initMetadataTable(Option<String> instantTime) {
// by default do nothing.
}

/**
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
Expand All @@ -1457,18 +1480,8 @@ protected final HoodieTable initTable(WriteOperationType operationType, Option<S
setWriteSchemaForDeletes(metaClient);
}

HoodieTable table;
Option<HoodieInstant> ownerInstant = Option.empty();
if (instantTime.isPresent()) {
ownerInstant = Option.of(new HoodieInstant(true, CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), instantTime.get()));
}
this.txnManager.beginTransaction(ownerInstant, Option.empty());
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction(ownerInstant);
}
doInitTable(metaClient, instantTime, initialMetadataTableIfNecessary);
HoodieTable table = createTable(config, hadoopConf, metaClient);

// Validate table properties
metaClient.validateTableProperties(config.getProps());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context, metaClient);
}

@Override
public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -291,10 +296,14 @@ public void initMetadataTable() {
HoodieFlinkTable<?> table = getHoodieTable();
if (config.isMetadataTableEnabled()) {
// initialize the metadata table path
try (HoodieBackedTableMetadataWriter metadataWriter = initMetadataWriter()) {
// do nothing
// guard the metadata writer with concurrent lock
try {
this.txnManager.getLockManager().lock();
initMetadataWriter().close();
} catch (Exception e) {
throw new HoodieException("Failed to initialize metadata table", e);
} finally {
this.txnManager.getLockManager().unlock();
}
// clean the obsolete index stats
table.deleteMetadataIndexIfNecessary();
Expand Down Expand Up @@ -478,16 +487,13 @@ private void completeClustering(
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}

@Override
protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
protected void doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// do nothing.

// flink executes the upgrade/downgrade once when initializing the first instant on start up,
// no need to execute the upgrade/downgrade on each write in streaming.

// flink performs metadata table bootstrap on the coordinator when it starts up.
}

public void completeTableService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieJavaTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieJavaTable.create(config, context, metaClient);
}

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records,
String instantTime) {
Expand Down Expand Up @@ -228,13 +233,4 @@ protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstan
public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringInstant, final boolean shouldComplete) {
throw new HoodieNotSupportedException("Cluster is not supported in HoodieJavaClient");
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime);

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWr
}

public static <T extends HoodieRecordPayload> HoodieJavaTable<T> create(HoodieWriteConfig config,
HoodieJavaEngineContext context,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
switch (metaClient.getTableType()) {
case COPY_ON_WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoop
return HoodieSparkTable.create(config, context);
}

@Override
protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf, HoodieTableMetaClient metaClient) {
return HoodieSparkTable.create(config, context, metaClient);
}

@Override
public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) {
// Create a Hoodie table which encapsulated the commits and files visible
Expand Down Expand Up @@ -434,16 +439,11 @@ private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitM
}

@Override
protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
if (initialMetadataTableIfNecessary) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
protected void initMetadataTable(Option<String> instantTime) {
// Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
// if it didn't exist before
// See https://issues.apache.org/jira/browse/HUDI-3343 for more details
initializeMetadataTable(instantTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.table;

import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieKey;
Expand Down Expand Up @@ -60,11 +59,11 @@ public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieW
.setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion())))
.setFileSystemRetryConfig(config.getFileSystemRetryConfig())
.setProperties(config.getProps()).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
return HoodieSparkTable.create(config, context, metaClient);
}

public static <T extends HoodieRecordPayload> HoodieSparkTable<T> create(HoodieWriteConfig config,
HoodieSparkEngineContext context,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
HoodieSparkTable<T> hoodieSparkTable;
switch (metaClient.getTableType()) {
Expand Down