Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public class HoodieCLI {
public static FileSystem fs;
public static CLIState state = CLIState.INIT;
public static String basePath;
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
protected static HoodieTableMetaClient tableMetadata;
public static TimelineLayoutVersion layoutVersion;
public static TempViewProvider tempViewProvider;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public String validateSync(
}

private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
throws IOException {
throws IOException {
List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
if (commitsToCatchup.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestCommitUtilities;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void init() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

// Create six commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void testShowArchivedCommits() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

// generate data and metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -158,6 +159,7 @@ private void generateArchive() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -184,10 +185,23 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds,
extraMetadata, operationType, config.getWriteSchema(), commitActionType);
HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
if (!config.getBasePath().endsWith("metadata")) {
LOG.warn(" ABWC. Going to start transaction " + instantTime);
}
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, table.getMetaClient().getCommitActionType(), instantTime)),
lastCompletedTxnAndMetadata.isPresent() ? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
preCommit(instantTime, metadata);
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
if (!config.getBasePath().endsWith("metadata")) {
LOG.warn(" ABWC. committing " + instantTime);
metadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> {
LOG.warn(" for partition " + partitionStatName);
for (HoodieWriteStat stat : writeStats) {
LOG.warn(" file info " + stat.getFileId() + ", path " + stat.getPath() + ", total bytes written " + stat.getTotalWriteBytes());
}
});
}
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
Expand All @@ -196,6 +210,9 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime, e);
} finally {
this.txnManager.endTransaction();
if (!config.getBasePath().endsWith("metadata")) {
LOG.warn(" AHWC transaction completed for " + instantTime);
}
}
// do this outside of lock since compaction, clustering can be time taking and we don't need a lock for the entire execution period
runTableServicesInline(table, metadata, extraMetadata);
Expand All @@ -220,7 +237,11 @@ protected void commit(HoodieTable table, String commitActionType, String instant
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);
protected HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf) {
return createTable(config, hadoopConf, false);
}

protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf, boolean refreshTimeline);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {
Expand All @@ -242,10 +263,6 @@ protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
// TODO : Conflict resolution is not supported for Flink & Java engines
}

protected void syncTableMetadata() {
// no-op
}

/**
* Filter out HoodieRecords that already exists in the output folder. This is useful in deduplication.
*
Expand Down Expand Up @@ -273,6 +290,7 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
public void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
table.getHoodieView().sync();
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
Expand Down Expand Up @@ -400,16 +418,6 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata
.isPresent()
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
if (writeOperationType != WriteOperationType.CLUSTER && writeOperationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
} finally {
this.txnManager.endTransaction();
}
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}

Expand Down Expand Up @@ -439,9 +447,6 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
Expand All @@ -451,6 +456,7 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me

protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (config.inlineTableServices()) {
table.getHoodieView().sync();
// Do an inline compaction if enabled
if (config.inlineCompactionEnabled()) {
runAnyPendingCompactions(table);
Expand Down Expand Up @@ -957,17 +963,17 @@ private Option<String> scheduleTableServiceInternal(String instantTime, Option<M
switch (tableServiceType) {
case CLUSTER:
LOG.info("Scheduling clustering at instant time :" + instantTime);
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf)
Option<HoodieClusteringPlan> clusteringPlan = createTable(config, hadoopConf, true)
.scheduleClustering(context, instantTime, extraMetadata);
return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case COMPACT:
LOG.info("Scheduling compaction at instant time :" + instantTime);
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf)
Option<HoodieCompactionPlan> compactionPlan = createTable(config, hadoopConf, true)
.scheduleCompaction(context, instantTime, extraMetadata);
return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty();
case CLEAN:
LOG.info("Scheduling cleaning at instant time :" + instantTime);
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf, true)
.scheduleCleaning(context, instantTime, extraMetadata);
return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TransactionManager(HoodieWriteConfig config, FileSystem fs) {
public synchronized void beginTransaction() {
if (supportsOptimisticConcurrency) {
LOG.info("Transaction starting without a transaction owner");
lockManager.lock();
lockManager.lock("empty");
LOG.info("Transaction started");
}
}
Expand All @@ -61,16 +61,16 @@ public synchronized void beginTransaction(Option<HoodieInstant> currentTxnOwnerI
LOG.info("Latest completed transaction instant " + lastCompletedTxnOwnerInstant);
this.currentTxnOwnerInstant = currentTxnOwnerInstant;
LOG.info("Transaction starting with transaction owner " + currentTxnOwnerInstant);
lockManager.lock();
lockManager.lock(currentTxnOwnerInstant.isPresent() ? currentTxnOwnerInstant.get().getTimestamp() : "");
LOG.info("Transaction started");
}
}

public synchronized void endTransaction() {
if (supportsOptimisticConcurrency) {
LOG.info("Transaction ending with transaction owner " + currentTxnOwnerInstant);
lockManager.unlock();
LOG.info("Transaction ended");
lockManager.unlock(currentTxnOwnerInstant.isPresent() ? currentTxnOwnerInstant.get().getTimestamp() : "");
LOG.info("Transaction ended ");
this.lastCompletedTxnOwnerInstant = Option.empty();
lockManager.resetLatestCompletedWriteInstant();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,6 @@

package org.apache.hudi.client.transaction.lock;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.lock.LockProvider;
Expand All @@ -30,9 +26,15 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieLockException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;

Expand All @@ -56,7 +58,7 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
}

public void lock() {
public void lock(String currentOwner) {
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
LockProvider lockProvider = getLockProvider();
int retryCount = 0;
Expand All @@ -67,25 +69,31 @@ public void lock() {
try {
acquired = lockProvider.tryLock(writeConfig.getLockAcquireWaitTimeoutInMs(), TimeUnit.MILLISECONDS);
if (acquired) {
LOG.warn("Acquiring lock succeeded for " + currentOwner);
break;
}
LOG.info("Retrying to acquire lock...");
LOG.warn("Retrying to acquire lock... " + currentOwner + " sleeping for " + waitTimeInMs
+ ", cur count " + retryCount + ", max retries " + retries);
Thread.sleep(waitTimeInMs);
retryCount++;
LOG.warn("Going into next loop to acquire lock... " + currentOwner);
} catch (InterruptedException e) {
if (retryCount >= retries) {
LOG.warn(" Interrupted exception thrown. retry count exceeded for " + currentOwner);
throw new HoodieLockException("Unable to acquire lock, lock object ", e);
}
}
}
if (!acquired) {
throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock());
LOG.warn(" Failed to acquire lock " + currentOwner);
throw new HoodieLockException("Unable to acquire lock, lock object " + lockProvider.getLock() + " owner : " + currentOwner);
}
}
}

public void unlock() {
public void unlock(String currentOwner) {
if (writeConfig.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
LOG.warn("Unlocking " + currentOwner);
getLockProvider().unlock();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final Hoodi
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.get());
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata()));
instantStream.forEach(instant -> {
try {
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1675,10 +1675,6 @@ public boolean isMetadataTableEnabled() {
return metadataConfig.enabled();
}

public boolean getFileListingMetadataVerify() {
return metadataConfig.validateFileListingMetadata();
}

public int getMetadataInsertParallelism() {
return getInt(HoodieMetadataConfig.INSERT_PARALLELISM_VALUE);
}
Expand Down
Loading