Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public String validateSync(
}

private String getString(HoodieTableMetaClient target, HoodieTimeline targetTimeline, HoodieTableMetaClient source, long sourceCount, long targetCount, String sourceLatestCommit)
throws IOException {
throws IOException {
List<HoodieInstant> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstants().collect(Collectors.toList());
if (commitsToCatchup.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -186,11 +187,11 @@ public void removeCorruptedPendingCleanAction() {
CleanerUtils.getCleanerPlan(client, instant);
} catch (AvroRuntimeException e) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
HoodieActiveTimeline.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
} catch (IOException ioe) {
if (ioe.getMessage().contains("Not an Avro data file")) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
FSUtils.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
HoodieActiveTimeline.deleteInstantFile(client.getFs(), client.getMetaPath(), instant);
} else {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestCommitUtilities;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void init() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

// Create six commits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.cli.testutils.HoodieTestReplaceCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void testShowArchivedCommits() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

// generate data and metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -158,6 +159,7 @@ private void generateArchive() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
HoodieTableMetaClient metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -241,13 +242,16 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String
}
}

/**
* Any pre-commit actions like conflict resolution or updating metadata table goes here.
* @param instantTime commit instant time.
* @param metadata commit metadata for which pre commit is being invoked.
*/
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
// no-op
// TODO : Conflict resolution is not supported for Flink & Java engines
}

protected void syncTableMetadata() {
// no-op
// Create a Hoodie table after starting the transaction which encapsulated the commits and files visible.
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'spretty hard to reason about these locking issues while insidethe method. Can we please move Comments to the caller

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already create a table instance from the caller? Can we not reuse it. Are you saying you need to see even the in-flight instants in this commit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was already there. infact I tried to re-use existing table instance and tests started to fail. and then I noticed this comments and reverted it back.

Copy link
Contributor Author

@nsivabalan nsivabalan Oct 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel adding this comment at the caller does not fit in well. This is what it looks like.

 try {
      // Precommit is meant for resolving conflicts if any and to update metadata table if enabled.
      // Create a Hoodie table within preCommit after starting the transaction which encapsulated the commits and files visible 
      // (instead of using an existing instance of Hoodie table). It is important to create this after the lock to ensure latest 
      // commits show up in the timeline without need for reload.
      preCommit(instantTime, metadata);
      commit(table, commitActionType, instantTime, metadata, stats);
      postCommit(table, metadata, instantTime, extraMetadata);
      ...

I feel having this within preCommit just belongs to the place where it is needed or used.
Let me know what do you think.

HoodieTable table = createTable(config, hadoopConf);
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime));
}

/**
Expand Down Expand Up @@ -404,16 +408,6 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
HoodieTableMetaClient metaClient) {
setOperationType(writeOperationType);
this.lastCompletedTxnAndMetadata = TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.txnManager.beginTransaction(Option.of(new HoodieInstant(State.INFLIGHT, metaClient.getCommitActionType(), instantTime)), lastCompletedTxnAndMetadata
.isPresent()
? Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
try {
if (writeOperationType != WriteOperationType.CLUSTER && writeOperationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
} finally {
this.txnManager.endTransaction();
}
this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
}

Expand Down Expand Up @@ -443,9 +437,6 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
if (operationType != null && operationType != WriteOperationType.CLUSTER && operationType != WriteOperationType.COMPACT) {
syncTableMetadata();
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final Hoodi
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.get());
final ConcurrentOperation thisOperation = new ConcurrentOperation(currentTxnOwnerInstant.get(), thisCommitMetadata.orElse(new HoodieCommitMetadata()));
instantStream.forEach(instant -> {
try {
ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,10 +1679,6 @@ public boolean isMetadataTableEnabled() {
return metadataConfig.enabled();
}

public boolean getFileListingMetadataVerify() {
return metadataConfig.validateFileListingMetadata();
}

public int getMetadataInsertParallelism() {
return getInt(HoodieMetadataConfig.INSERT_PARALLELISM_VALUE);
}
Expand Down
Loading