Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
0b79b1d
Refactored MT records composition from `HoodieCommitMetadata` to alwa…
Feb 2, 2022
a8e3cf0
Fixed `HoodieMetadataPayload` to properly combine file-sizes
Feb 2, 2022
4a2a40b
Added TODO
Feb 2, 2022
682bdbc
Tidying up
Feb 3, 2022
5d5641a
Fixed tests' incorrect `WriteStat` generation
Feb 3, 2022
ff6b34d
Streamlined FS Metadata merging logic (to simply always prefer more r…
Feb 3, 2022
b2ebb92
Added validations
Feb 3, 2022
fe16268
Fixed validation seq to only validate file listing records
Feb 3, 2022
11e5ecd
Fix `HoodieMetadataPayload` sequence to appropriately handle tombstones
Feb 4, 2022
eca9828
Tidying up
Feb 4, 2022
ef1ab67
Bubbled the assertion for positive file-sizes to be conducted upon MT…
Feb 7, 2022
8dc1027
Fixing Flink tests
Feb 7, 2022
410f4d7
Fixed `HoodieMetadataPayload` operation to become commutative
Feb 11, 2022
0c2028f
Fixed compilation
Feb 11, 2022
771a85d
Consolidated table init-seq w/in `BaseHoodieWriteClient` to avoid dup…
Feb 16, 2022
3a0e432
Added `JavaUpgradeDowngradHelper`;
Feb 16, 2022
89aa0ca
Fixed contextual timer setting;
Feb 16, 2022
72d4237
Make sure every `WriteClient` operation initializes Hudi Table properly
Feb 16, 2022
44d3c26
Fixing tests creating invalid base files
Feb 16, 2022
b211322
Revisited rollback related APIs to only invoke `initTable` seq for pu…
Feb 23, 2022
eab17ed
Fixed tests
Feb 23, 2022
2da986b
Missing license
Feb 24, 2022
834260e
Fixing compilation
Feb 25, 2022
cfcaf5d
Removed table init seq from `rollbackFailedWrite`
Feb 25, 2022
0ac5c2a
Fixing tests
Mar 1, 2022
157095d
Cleaning up
Mar 4, 2022
b3136ad
Fixing compilation
Mar 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.client;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
Expand Down Expand Up @@ -45,6 +47,7 @@
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
Expand Down Expand Up @@ -73,9 +76,8 @@
import org.apache.hudi.table.action.rollback.RollbackUtils;
import org.apache.hudi.table.action.savepoint.SavepointHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;

import com.codahale.metrics.Timer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -107,15 +109,16 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);

protected final transient HoodieMetrics metrics;
private final transient HoodieIndex<?, ?> index;
private final SupportsUpgradeDowngrade upgradeDowngradeHelper;
private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;

protected final transient HoodieMetrics metrics;
protected transient Timer.Context writeTimer = null;
protected transient Timer.Context compactionTimer;
protected transient Timer.Context clusteringTimer;

private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
protected transient AsyncCleanerService asyncCleanerService;
protected transient AsyncArchiveService asyncArchiveService;
protected final TransactionManager txnManager;
Expand All @@ -125,25 +128,32 @@ public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K,
* Create a write client, with new hudi index.
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param upgradeDowngradeHelper engine-specific instance of {@link SupportsUpgradeDowngrade}
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
this(context, writeConfig, Option.empty());
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
this(context, writeConfig, Option.empty(), upgradeDowngradeHelper);
}

/**
* Create a write client, allows to specify all parameters.
*
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param writeConfig instance of HoodieWriteConfig
* @param timelineService Timeline Service that runs as part of write client.
*/
@Deprecated
public BaseHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
public BaseHoodieWriteClient(HoodieEngineContext context,
HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config);
this.index = createIndex(writeConfig);
this.txnManager = new TransactionManager(config, fs);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}

protected abstract HoodieIndex<?, ?> createIndex(HoodieWriteConfig writeConfig);
Expand Down Expand Up @@ -291,15 +301,15 @@ public void bootstrap(Option<Map<String, String>> extraMetadata) {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
throw new HoodieException("Cannot bootstrap the table in multi-writer mode");
}
HoodieTable<T, I, K, O> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS));
rollbackFailedBootstrap();
table.bootstrap(context, extraMetadata);
}

/**
* Main API to rollback failed bootstrap.
*/
public void rollbackFailedBootstrap() {
protected void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Expand Down Expand Up @@ -628,33 +638,19 @@ public void deleteSavepoint(String savepointTime) {
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
SavepointHelpers.validateSavepointPresence(table, savepointTime);
restoreToInstant(savepointTime);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}

@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
Option<HoodiePendingRollbackInfo> pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime);
return rollback(commitInstantTime, pendingRollbackInfo, false);
}

/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)}
* Adding this api for backwards compatability.
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
return rollback(commitInstantTime, Option.empty(), skipLocking);
}

/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
Expand Down Expand Up @@ -711,7 +707,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, config.isMetadataTableEnabled());
HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN, Option.empty());
Option<HoodieRestorePlan> restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime);
Expand Down Expand Up @@ -988,15 +984,15 @@ protected Map<String, Option<HoodiePendingRollbackInfo>> getPendingRollbackInfos
/**
* Rollback all failed writes.
*/
public Boolean rollbackFailedWrites() {
protected Boolean rollbackFailedWrites() {
return rollbackFailedWrites(false);
}

/**
* Rollback all failed writes.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
protected Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty());
Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient());
Expand Down Expand Up @@ -1246,17 +1242,79 @@ public HoodieMetrics getMetrics() {
}

/**
* Get HoodieTable and init {@link Timer.Context}.
* Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary
* bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped)
*
* NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS
* NOT REQUIRING EXTERNAL SYNCHRONIZATION
*
* @param operationType write operation type
* @param metaClient instance of {@link HoodieTableMetaClient}
* @param instantTime current inflight instant time
* @return HoodieTable
* @return instantiated {@link HoodieTable}
*/
protected abstract HoodieTable<T, I, K, O> getTableAndInitCtx(WriteOperationType operationType, String instantTime);
protected abstract HoodieTable<T, I, K, O> doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime);

/**
* Sets write schema from last instant since deletes may not have schema set in the config.
* Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping
* operations such as:
*
* NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on
* {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
*
* <ul>
* <li>Checking whether upgrade/downgrade is required</li>
* <li>Bootstrapping Metadata Table (if required)</li>
* <li>Initializing metrics contexts</li>
* </ul>
*/
protected final HoodieTable<T, I, K, O> initTable(WriteOperationType operationType, Option<String> instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
setWriteSchemaForDeletes(metaClient);
}

HoodieTable<T, I, K, O> table;

this.txnManager.beginTransaction();
try {
tryUpgrade(metaClient, instantTime);
table = doInitTable(metaClient, instantTime);
} finally {
this.txnManager.endTransaction();
}

// Validate table properties
metaClient.validateTableProperties(config.getProps(), operationType);
// Make sure that FS View is in sync
table.getHoodieView().sync();

switch (operationType) {
case INSERT:
case INSERT_PREPPED:
case UPSERT:
case UPSERT_PREPPED:
case BULK_INSERT:
case BULK_INSERT_PREPPED:
case INSERT_OVERWRITE:
case INSERT_OVERWRITE_TABLE:
setWriteTimer(table);
break;
case CLUSTER:
clusteringTimer = metrics.getClusteringCtx();
break;
case COMPACT:
compactionTimer = metrics.getCompactionCtx();
break;
default:
}

return table;
}

/**
* Sets write schema from last instant since deletes may not have schema set in the config.
*/
protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
try {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
Expand Down Expand Up @@ -1301,4 +1359,33 @@ public void close() {
this.heartbeatClient.stop();
this.txnManager.close();
}

private void setWriteTimer(HoodieTable<T, I, K, O> table) {
String commitType = table.getMetaClient().getCommitActionType();
if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) {
writeTimer = metrics.getCommitCtx();
} else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
writeTimer = metrics.getDeltaCommitCtx();
}
}

private void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
UpgradeDowngrade upgradeDowngrade =
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);

if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per master, this code is available only in SparkEngine. Flink and java does not have this. Even if we wish to unify, I would do it in a separate patch and getting it reviewed by experts who have worked on it. Can you move this to SparkRDDWriteClient for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is accurate -- Flink is also using UpgradeDowngrade. Only Java-engine didn't have corresponding helper which i've bootstrapped in here, otherwise code of the UpgradeDowngrade itself is engine-agnostic.

Can you please elaborate what your concern around it is?

Frankly, am not a fan that i had to do budge this refactoring in this PR, but after adding validations into HoodieMetadataPayload -- tests started to fail b/c we were not init'ing MT appropriately, so i had to address that holistically across all engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine. but just that I want to get a stamp it from devs who work in flink and java. Just don't want to miss anything, bcoz, not many times every engine follows same logic. I checked the code and refactoring once again, seems to be fine.
@danny0405 : we are making some minor refactoring relating to upgradedowngrade invocation for flink engine. just wanted to keep you in the loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua : we are making some minor refactoring relating to upgradedowngrade invocation for java engine. just wanted to keep you in the loop. Prior to this patch, we did not have any explicit upgrade downgrade. Just wanted to ensure unification does not cause any issues. if you can review it once and stamp it, would be nice.

// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);

Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May i know why we rollback the failed commits before doing the upgrade ? We already try to do that when start commit:

CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was migrated as is so can't speak up from historical context, but my hunch is that we do that to make sure table is in a consistent state (no leftovers of failed commits) when we start the upgrade process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure table is in a consistent state (no leftovers of failed commits) when we start the upgrade process.

My confusion is why we need to do that for upgrade ?
Is there any restriction here for correctness ? The code before the patch does not do so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code was doing it in some branches (for Spark) but not in others. This PR is simply reconciling that and making behavior consistent across the board. In general, given that upgrade/downgrade could be acting upon and modifying table's current state it has to be in a consistent one.

Copy link
Contributor

@danny0405 danny0405 May 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, given that upgrade/downgrade could be acting upon and modifying table's current state it has to be in a consistent one

I'm wondering the exact point for the background, i have two questions here:

  1. we already do a rollback when starting a new instant, so why here we rollback again ?
  2. the intermediate/corrupted data would sooner or later be rolled back, and i see that most of the upgrade/downgrade logic does not touch the data files/but only the table configs, so why there is need to rollback here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how the code was in 0.9.0

      if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
        this.txnManager.beginTransaction();
        try {
          // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
          this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER));
          new SparkUpgradeDowngrade(metaClient, config, context)
              .run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
        } finally {
          this.txnManager.endTransaction();
        }
      } else {
        upgradeDowngrade.run(metaClient, HoodieTableVersion.current(), config, context, instantTime);
      }

and here is how it looks in 0.10.0

if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
        // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
        List<String> instantsToRollback = getInstantsToRollback(
            metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime));
        Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = getPendingRollbackInfos(metaClient);
        instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));
        this.rollbackFailedWrites(pendingRollbacks, true);
        new UpgradeDowngrade(
            metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
            .run(HoodieTableVersion.current(), instantTime);
        metaClient.reloadActiveTimeline();
        initializeMetadataTable(Option.of(instantTime));
      }

So, atleast from 0.9.0, its evident that we were adding this only for multi-writer scenario. i.e when someone migrates from a single writer to multi-writer, we just wanted to ensure we rollback any partially failed commits.
But I don't see any gaps as such w/ current state of things. bcoz, this code gets exercised only when an upgrade is required. So, as per the guideline, we should have only only writer in flight when an upgrade happens. So, we can do eager rollbacks irrespective of whether multi-writer is enabled or not.
Let me know if you can think of a scenario where we can't trigger eager rollbacks during upgrade ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 : happy to jam to see if there are any gaps here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fired a fix here, i think there needs some improvement here: #5535

instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty()));

rollbackFailedWrites(pendingRollbacks, true);

new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
.run(HoodieTableVersion.current(), instantTime.orElse(null));

metaClient.reloadActiveTimeline();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ public List<WriteStatus> close() {
writer = null;

// update final size, once for all log files
// TODO we can actually deduce file size purely from AppendResult (based on offset and size
// of the appended block)
for (WriteStatus status: statuses) {
long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath()));
status.getStat().setFileSizeInBytes(logFileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.hudi.io;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -35,11 +39,6 @@
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -235,20 +234,14 @@ protected void setupWriteStatus() throws IOException {
stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
stat.setFileId(writeStatus.getFileId());
stat.setPath(new Path(config.getBasePath()), path);
stat.setTotalWriteBytes(computeTotalWriteBytes());
stat.setFileSizeInBytes(computeFileSizeInBytes());
stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());

long fileSize = FSUtils.getFileSize(fs, path);
stat.setTotalWriteBytes(fileSize);
stat.setFileSizeInBytes(fileSize);

RuntimeStats runtimeStats = new RuntimeStats();
runtimeStats.setTotalCreateTime(timer.endTimer());
stat.setRuntimeStats(runtimeStats);
}

protected long computeTotalWriteBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}

protected long computeFileSizeInBytes() throws IOException {
return FSUtils.getFileSize(fs, path);
}

}
Loading