Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class HoodieCLI {
public static CLIState state = CLIState.INIT;
public static String basePath;
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
public static HoodieTableMetaClient metaClient;
public static TimelineLayoutVersion layoutVersion;
private static TempViewProvider tempViewProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String getPrompt() {
case TABLE:
return "hudi:" + tableName + "->";
case SYNC:
return "hudi:" + tableName + " <==> " + HoodieCLI.syncTableMetadata.getTableConfig().getTableName() + "->";
return "hudi:" + tableName + " <==> " + HoodieCLI.metaClient.getTableConfig().getTableName() + "->";
default:
return "hudi:" + tableName + "->";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,10 @@ public String compareCommits(@CliOption(key = {"path"}, help = "Path of the tabl

@CliCommand(value = "commits sync", help = "Compare commits with another Hoodie table")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the table to compare to") final String path) {
HoodieCLI.syncTableMetadata = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieCLI.metaClient = HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " + HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
+ HoodieCLI.metaClient.getTableConfig().getTableName();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public String validateSync(
@CliOption(key = {"hivePass"}, mandatory = true, unspecifiedDefaultValue = "",
help = "hive password to connect to") final String hivePass)
throws Exception {
if (HoodieCLI.syncTableMetadata == null) {
if (HoodieCLI.metaClient == null) {
throw new HoodieException("Sync validate request target table not null.");
}
HoodieTableMetaClient target = HoodieCLI.syncTableMetadata;
HoodieTableMetaClient target = HoodieCLI.metaClient;
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline();
HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -187,6 +188,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
Expand Down Expand Up @@ -241,10 +243,6 @@ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
// TODO : Conflict resolution is not supported for Flink & Java engines
}

protected void syncTableMetadata() {
// no-op
}

/**
* Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
Expand Down Expand Up @@ -399,14 +397,6 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata
.isPresent()
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
syncTableMetadata();
} finally {
this.txnManager.endTransaction();
}
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}

Expand Down Expand Up @@ -435,7 +425,6 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit();
syncTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final Hoodi
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.get());
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata()));
instantStream.forEach(instant -> {
try {
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ public HoodieLockConfig.Builder withLockProvider(Class<? extends LockProvider> l
return this;
}

public HoodieLockConfig.Builder withLockProviderClass(String lockProviderClassName) {
lockConfig.setValue(LOCK_PROVIDER_CLASS_PROP, lockProviderClassName);
return this;
}

public HoodieLockConfig.Builder withHiveDatabaseName(String databaseName) {
lockConfig.setValue(HIVE_DATABASE_NAME_PROP, databaseName);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1220,10 +1220,6 @@ public boolean useFileListingMetadata() {
return metadataConfig.useFileListingMetadata();
}

public boolean getFileListingMetadataVerify() {
return metadataConfig.validateFileListingMetadata();
}

public int getMetadataInsertParallelism() {
return getInt(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP);
}
Expand Down Expand Up @@ -1305,6 +1301,17 @@ public boolean allowEmptyCommit() {
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
}

/**
* Record level index configs.
*/
public boolean isRecordLevelIndexEnabled() {
return metadataConfig.isRecordLevelIndexEnabled();
}

public int getRecordLevelIndexShardCount() {
return metadataConfig.getRecordLevelIndexShardCount();
}

public static class Builder {

protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
Expand All @@ -42,10 +44,26 @@
@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
public abstract class HoodieIndex<T extends HoodieRecordPayload, I, K, O> implements Serializable {

// Metric names
protected static final String UPDATE_DURATION = "update.duration";
protected static final String UPDATE_COUNT = "update.count";
protected static final String INSERT_COUNT = "insert.count";
protected static final String DELETE_COUNT = "delete.count";
protected static final String TAG_LOCATION_DURATION = "tag.duration";
protected static final String TAG_LOCATION_COUNT = "tag.count";
protected static final String TAG_LOCATION_NUM_PARTITIONS = "tag.num_partitions";
protected static final String TAG_LOCATION_HITS = "tag.hits";

// Metric registry
protected Option<Registry> registry;

protected final HoodieWriteConfig config;

protected HoodieIndex(HoodieWriteConfig config) {
this.config = config;
if (config.getTableName() != null) {
this.registry = Option.of(Registry.getRegistry(config.getTableName() + "." + this.getClass().getSimpleName()));
}
}

/**
Expand Down Expand Up @@ -104,6 +122,6 @@ public void close() {
}

public enum IndexType {
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE
HBASE, INMEMORY, BLOOM, GLOBAL_BLOOM, SIMPLE, GLOBAL_SIMPLE, RECORD_LEVEL
}
}
Loading