diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 7b67ff54a2aa5..eca2a3d672901 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.async.AsyncArchiveService; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.model.HoodieCleanMetadata; @@ -45,6 +47,7 @@ import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; @@ -73,9 +76,8 @@ import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -107,15 +109,16 @@ public abstract class BaseHoodieWriteClient index; + private final SupportsUpgradeDowngrade upgradeDowngradeHelper; + private transient WriteOperationType operationType; + private transient HoodieWriteCommitCallback commitCallback; + protected final transient HoodieMetrics metrics; protected transient Timer.Context writeTimer = null; protected transient Timer.Context compactionTimer; protected transient Timer.Context clusteringTimer; - private transient WriteOperationType operationType; - private transient HoodieWriteCommitCallback commitCallback; protected transient AsyncCleanerService asyncCleanerService; protected transient AsyncArchiveService asyncArchiveService; protected final TransactionManager txnManager; @@ -125,25 +128,32 @@ public abstract class BaseHoodieWriteClient timelineService) { + public BaseHoodieWriteClient(HoodieEngineContext context, + HoodieWriteConfig writeConfig, + Option timelineService, + SupportsUpgradeDowngrade upgradeDowngradeHelper) { super(context, writeConfig, timelineService); this.metrics = new HoodieMetrics(config); this.index = createIndex(writeConfig); this.txnManager = new TransactionManager(config, fs); + this.upgradeDowngradeHelper = upgradeDowngradeHelper; } protected abstract HoodieIndex createIndex(HoodieWriteConfig writeConfig); @@ -291,7 +301,7 @@ public void bootstrap(Option> extraMetadata) { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { throw new HoodieException("Cannot bootstrap the table in multi-writer mode"); } - HoodieTable table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS); + HoodieTable table = initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)); rollbackFailedBootstrap(); table.bootstrap(context, extraMetadata); } @@ -299,7 +309,7 @@ public void bootstrap(Option> extraMetadata) { /** * Main API to rollback failed bootstrap. */ - public void rollbackFailedBootstrap() { + protected void rollbackFailedBootstrap() { LOG.info("Rolling back pending bootstrap if present"); HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); @@ -628,7 +638,7 @@ public void deleteSavepoint(String savepointTime) { * @return true if the savepoint was restored to successfully */ public void restoreToSavepoint(String savepointTime) { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); SavepointHelpers.validateSavepointPresence(table, savepointTime); restoreToInstant(savepointTime); SavepointHelpers.validateSavepointRestore(table, savepointTime); @@ -636,25 +646,11 @@ public void restoreToSavepoint(String savepointTime) { @Deprecated public boolean rollback(final String commitInstantTime) throws HoodieRollbackException { - HoodieTable table = createTable(config, hadoopConf); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); Option pendingRollbackInfo = getPendingRollbackInfo(table.getMetaClient(), commitInstantTime); return rollback(commitInstantTime, pendingRollbackInfo, false); } - /** - * @Deprecated - * Rollback the inflight record changes with the given commit time. This - * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String)} - * Adding this api for backwards compatability. - * @param commitInstantTime Instant time of the commit - * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. - * @throws HoodieRollbackException if rollback cannot be performed successfully - */ - @Deprecated - public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException { - return rollback(commitInstantTime, Option.empty(), skipLocking); - } - /** * @Deprecated * Rollback the inflight record changes with the given commit time. This @@ -711,7 +707,7 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context timerContext = metrics.getRollbackCtx(); try { - HoodieTable table = createTable(config, hadoopConf, config.isMetadataTableEnabled()); + HoodieTable table = initTable(WriteOperationType.UNKNOWN, Option.empty()); Option restorePlanOption = table.scheduleRestore(context, restoreInstantTime, instantTime); if (restorePlanOption.isPresent()) { HoodieRestoreMetadata restoreMetadata = table.restore(context, restoreInstantTime, instantTime); @@ -988,7 +984,7 @@ protected Map> getPendingRollbackInfos /** * Rollback all failed writes. */ - public Boolean rollbackFailedWrites() { + protected Boolean rollbackFailedWrites() { return rollbackFailedWrites(false); } @@ -996,7 +992,7 @@ public Boolean rollbackFailedWrites() { * Rollback all failed writes. * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. */ - public Boolean rollbackFailedWrites(boolean skipLocking) { + protected Boolean rollbackFailedWrites(boolean skipLocking) { HoodieTable table = createTable(config, hadoopConf); List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); @@ -1246,17 +1242,79 @@ public HoodieMetrics getMetrics() { } /** - * Get HoodieTable and init {@link Timer.Context}. + * Instantiates engine-specific instance of {@link HoodieTable} as well as performs necessary + * bootstrapping operations (for ex, validating whether Metadata Table has to be bootstrapped) + * + * NOTE: THIS OPERATION IS EXECUTED UNDER LOCK, THEREFORE SHOULD AVOID ANY OPERATIONS + * NOT REQUIRING EXTERNAL SYNCHRONIZATION * - * @param operationType write operation type + * @param metaClient instance of {@link HoodieTableMetaClient} * @param instantTime current inflight instant time - * @return HoodieTable + * @return instantiated {@link HoodieTable} */ - protected abstract HoodieTable getTableAndInitCtx(WriteOperationType operationType, String instantTime); + protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option instantTime); /** - * Sets write schema from last instant since deletes may not have schema set in the config. + * Instantiates and initializes instance of {@link HoodieTable}, performing crucial bootstrapping + * operations such as: + * + * NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please check on + * {@link #doInitTable(HoodieTableMetaClient, Option)} instead + * + *
    + *
  • Checking whether upgrade/downgrade is required
  • + *
  • Bootstrapping Metadata Table (if required)
  • + *
  • Initializing metrics contexts
  • + *
*/ + protected final HoodieTable initTable(WriteOperationType operationType, Option instantTime) { + HoodieTableMetaClient metaClient = createMetaClient(true); + // Setup write schemas for deletes + if (operationType == WriteOperationType.DELETE) { + setWriteSchemaForDeletes(metaClient); + } + + HoodieTable table; + + this.txnManager.beginTransaction(); + try { + tryUpgrade(metaClient, instantTime); + table = doInitTable(metaClient, instantTime); + } finally { + this.txnManager.endTransaction(); + } + + // Validate table properties + metaClient.validateTableProperties(config.getProps(), operationType); + // Make sure that FS View is in sync + table.getHoodieView().sync(); + + switch (operationType) { + case INSERT: + case INSERT_PREPPED: + case UPSERT: + case UPSERT_PREPPED: + case BULK_INSERT: + case BULK_INSERT_PREPPED: + case INSERT_OVERWRITE: + case INSERT_OVERWRITE_TABLE: + setWriteTimer(table); + break; + case CLUSTER: + clusteringTimer = metrics.getClusteringCtx(); + break; + case COMPACT: + compactionTimer = metrics.getCompactionCtx(); + break; + default: + } + + return table; + } + + /** + * Sets write schema from last instant since deletes may not have schema set in the config. + */ protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) { try { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); @@ -1301,4 +1359,33 @@ public void close() { this.heartbeatClient.stop(); this.txnManager.close(); } + + private void setWriteTimer(HoodieTable table) { + String commitType = table.getMetaClient().getCommitActionType(); + if (commitType.equals(HoodieTimeline.COMMIT_ACTION)) { + writeTimer = metrics.getCommitCtx(); + } else if (commitType.equals(HoodieTimeline.DELTA_COMMIT_ACTION)) { + writeTimer = metrics.getDeltaCommitCtx(); + } + } + + private void tryUpgrade(HoodieTableMetaClient metaClient, Option instantTime) { + UpgradeDowngrade upgradeDowngrade = + new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); + + if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { + // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits + List instantsToRollback = getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime); + + Map> pendingRollbacks = getPendingRollbackInfos(metaClient); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + + rollbackFailedWrites(pendingRollbacks, true); + + new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper) + .run(HoodieTableVersion.current(), instantTime.orElse(null)); + + metaClient.reloadActiveTimeline(); + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 7eafe268ba8e8..db9083f9e77c4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -417,6 +417,8 @@ public List close() { writer = null; // update final size, once for all log files + // TODO we can actually deduce file size purely from AppendResult (based on offset and size + // of the appended block) for (WriteStatus status: statuses) { long logFileSize = FSUtils.getFileSize(fs, new Path(config.getBasePath(), status.getStat().getPath())); status.getStat().setFileSizeInBytes(logFileSize); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 096c257b1f797..3e7e0b16e2cf8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -18,6 +18,10 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -35,11 +39,6 @@ import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -235,20 +234,14 @@ protected void setupWriteStatus() throws IOException { stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT); stat.setFileId(writeStatus.getFileId()); stat.setPath(new Path(config.getBasePath()), path); - stat.setTotalWriteBytes(computeTotalWriteBytes()); - stat.setFileSizeInBytes(computeFileSizeInBytes()); stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords()); + + long fileSize = FSUtils.getFileSize(fs, path); + stat.setTotalWriteBytes(fileSize); + stat.setFileSizeInBytes(fileSize); + RuntimeStats runtimeStats = new RuntimeStats(); runtimeStats.setTotalCreateTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); } - - protected long computeTotalWriteBytes() throws IOException { - return FSUtils.getFileSize(fs, path); - } - - protected long computeFileSizeInBytes() throws IOException { - return FSUtils.getFileSize(fs, path); - } - } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 1f5d14af744fb..fb613309d3fa2 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.data.HoodieList; @@ -62,9 +64,6 @@ import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.util.FlinkClientUtil; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,7 +92,7 @@ public class HoodieFlinkWriteClient extends private Option metadataWriterOption = Option.empty(); public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { - super(context, writeConfig); + super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); } @@ -136,7 +135,7 @@ public void bootstrap(Option> extraMetadata) { @Override public List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(), @@ -152,7 +151,7 @@ public List upsert(List> records, String instantTim public List upsertPreppedRecords(List> preppedRecords, String instantTime) { // only used for metadata table, the upsert happens in single thread HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); final HoodieWriteHandle writeHandle = getOrCreateWriteHandle(preppedRecords.get(0), getConfig(), @@ -164,7 +163,7 @@ public List upsertPreppedRecords(List> preppedRecor @Override public List insert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); // create the write handle if not exists @@ -187,7 +186,7 @@ public List insert(List> records, String instantTim public List insertOverwrite( List> records, final String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); + initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); // create the write handle if not exists @@ -206,7 +205,7 @@ public List insertOverwrite( */ public List insertOverwriteTable( List> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); // create the write handle if not exists @@ -239,7 +238,7 @@ public List bulkInsertPreppedRecords(List> preppedR @Override public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context, instantTime, keys); return postWrite(result, instantTime, table); @@ -397,11 +396,9 @@ public HoodieWriteMetadata> cluster(final String clusteringIns } @Override - protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); - new UpgradeDowngrade(metaClient, config, context, FlinkUpgradeDowngradeHelper.getInstance()) - .run(HoodieTableVersion.current(), instantTime); - return getTableAndInitCtx(metaClient, operationType); + protected HoodieTable>, List, List> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { + // Create a Hoodie table which encapsulated the commits and files visible + return getHoodieTable(); } /** @@ -488,20 +485,6 @@ public void cleanHandlesGracefully() { return writeHandle; } - private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieFlinkTable table = getHoodieTable(); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - return table; - } - public HoodieFlinkTable getHoodieTable() { return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index f365f29329782..9de9298c25ae9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -18,6 +18,8 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.data.HoodieList; @@ -30,7 +32,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -40,9 +41,7 @@ import org.apache.hudi.table.HoodieJavaTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.table.upgrade.JavaUpgradeDowngradeHelper; import java.util.List; import java.util.Map; @@ -52,14 +51,14 @@ public class HoodieJavaWriteClient extends BaseHoodieWriteClient>, List, List> { public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - super(context, clientConfig); + super(context, clientConfig, JavaUpgradeDowngradeHelper.getInstance()); } public HoodieJavaWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, Option timelineService) { - super(context, writeConfig, timelineService); + super(context, writeConfig, timelineService, JavaUpgradeDowngradeHelper.getInstance()); } @Override @@ -99,7 +98,7 @@ protected HoodieTable>, List, List upsert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); @@ -113,7 +112,7 @@ public List upsert(List> records, public List upsertPreppedRecords(List> preppedRecords, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); @@ -123,7 +122,7 @@ public List upsertPreppedRecords(List> preppedRecor @Override public List insert(List> records, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.insert(context, instantTime, records); @@ -137,7 +136,7 @@ public List insert(List> records, String instantTim public List insertPreppedRecords(List> preppedRecords, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); + initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); @@ -169,7 +168,7 @@ public List bulkInsertPreppedRecords(List> preppedR String instantTime, Option>>> bulkInsertPartitioner) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); + initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsertPrepped(context, instantTime, preppedRecords, bulkInsertPartitioner); @@ -180,7 +179,7 @@ public List bulkInsertPreppedRecords(List> preppedR public List delete(List keys, String instantTime) { HoodieTable>, List, List> table = - getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); @@ -233,23 +232,11 @@ public HoodieWriteMetadata> cluster(final String clusteringIns } @Override - protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); + protected HoodieTable>, List, List> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { // new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient, HoodieTableVersion.current(), config, context, instantTime); - return getTableAndInitCtx(metaClient, operationType); - } - private HoodieTable>, List, List> getTableAndInitCtx(HoodieTableMetaClient metaClient, WriteOperationType operationType) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } // Create a Hoodie table which encapsulated the commits and files visible - HoodieJavaTable table = HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - return table; + return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, metaClient); } + } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java new file mode 100644 index 0000000000000..e1c44d0913318 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/upgrade/JavaUpgradeDowngradeHelper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.table.HoodieJavaTable; +import org.apache.hudi.table.HoodieTable; + +/** + * Java upgrade and downgrade helper + */ +public class JavaUpgradeDowngradeHelper implements SupportsUpgradeDowngrade { + + private static final JavaUpgradeDowngradeHelper SINGLETON_INSTANCE = + new JavaUpgradeDowngradeHelper(); + + private JavaUpgradeDowngradeHelper() {} + + public static JavaUpgradeDowngradeHelper getInstance() { + return SINGLETON_INSTANCE; + } + + @Override + public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return HoodieJavaTable.create(config, context); + } + + @Override + public String getPartitionColumns(HoodieWriteConfig config) { + return config.getProps().getProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index d51d25616c70d..7e142d89c4b87 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -18,15 +18,15 @@ package org.apache.hudi.client; +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.TransactionUtils; -import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -35,7 +35,6 @@ import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -55,10 +54,6 @@ import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; -import org.apache.hudi.table.upgrade.UpgradeDowngrade; - -import com.codahale.metrics.Timer; -import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; @@ -94,7 +89,7 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { - super(context, writeConfig, timelineService); + super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance()); } /** @@ -147,13 +142,13 @@ public JavaRDD> filterExists(JavaRDD> hoodieReco */ @Override public void bootstrap(Option> extraMetadata) { - getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).bootstrap(context, extraMetadata); + initTable(WriteOperationType.UPSERT, Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata); } @Override public JavaRDD upsert(JavaRDD> records, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.UPSERT, instantTime); + initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.upsert(context, instantTime, records); @@ -166,7 +161,7 @@ public JavaRDD upsert(JavaRDD> records, String inst @Override public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime); + initTable(WriteOperationType.UPSERT_PREPPED, Option.ofNullable(instantTime)); table.validateUpsertSchema(); preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.upsertPrepped(context,instantTime, preppedRecords); @@ -176,7 +171,7 @@ public JavaRDD upsertPreppedRecords(JavaRDD> preppe @Override public JavaRDD insert(JavaRDD> records, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.INSERT, instantTime); + initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.insert(context,instantTime, records); @@ -186,7 +181,7 @@ public JavaRDD insert(JavaRDD> records, String inst @Override public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, String instantTime) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.INSERT_PREPPED, instantTime); + initTable(WriteOperationType.INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.insertPrepped(context,instantTime, preppedRecords); @@ -201,7 +196,7 @@ public JavaRDD insertPreppedRecords(JavaRDD> preppe * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public HoodieWriteResult insertOverwrite(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient()); HoodieWriteMetadata result = table.insertOverwrite(context, instantTime, records); @@ -216,7 +211,7 @@ public HoodieWriteResult insertOverwrite(JavaRDD> records, final * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public HoodieWriteResult insertOverwriteTable(JavaRDD> records, final String instantTime) { - HoodieTable table = getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE_TABLE, instantTime); + HoodieTable table = initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient()); HoodieWriteMetadata result = table.insertOverwriteTable(context, instantTime, records); @@ -231,7 +226,7 @@ public JavaRDD bulkInsert(JavaRDD> records, String @Override public JavaRDD bulkInsert(JavaRDD> records, String instantTime, Option>>> userDefinedBulkInsertPartitioner) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT, instantTime); + initTable(WriteOperationType.BULK_INSERT, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsert(context,instantTime, records, userDefinedBulkInsertPartitioner); @@ -241,7 +236,7 @@ public JavaRDD bulkInsert(JavaRDD> records, String @Override public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, String instantTime, Option>>> bulkInsertPartitioner) { HoodieTable>, JavaRDD, JavaRDD> table = - getTableAndInitCtx(WriteOperationType.BULK_INSERT_PREPPED, instantTime); + initTable(WriteOperationType.BULK_INSERT_PREPPED, Option.ofNullable(instantTime)); table.validateInsertSchema(); preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, table.getMetaClient()); HoodieWriteMetadata> result = table.bulkInsertPrepped(context,instantTime, preppedRecords, bulkInsertPartitioner); @@ -250,14 +245,14 @@ public JavaRDD bulkInsertPreppedRecords(JavaRDD> pr @Override public JavaRDD delete(JavaRDD keys, String instantTime) { - HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE, instantTime); + HoodieTable>, JavaRDD, JavaRDD> table = initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient()); HoodieWriteMetadata> result = table.delete(context,instantTime, keys); return postWrite(result, instantTime, table); } public HoodieWriteResult deletePartitions(List partitions, String instantTime) { - HoodieTable>, JavaRDD, JavaRDD> table = getTableAndInitCtx(WriteOperationType.DELETE_PARTITION, instantTime); + HoodieTable>, JavaRDD, JavaRDD> table = initTable(WriteOperationType.DELETE_PARTITION, Option.ofNullable(instantTime)); preWrite(instantTime, WriteOperationType.DELETE_PARTITION, table.getMetaClient()); HoodieWriteMetadata> result = table.deletePartitions(context, instantTime, partitions); return new HoodieWriteResult(postWrite(result, instantTime, table), result.getPartitionToReplaceFileIds()); @@ -420,34 +415,14 @@ private void updateTableMetadata(HoodieTable>, JavaRD } @Override - protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, - String instantTime) { - HoodieTableMetaClient metaClient = createMetaClient(true); - UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); - try { - this.txnManager.beginTransaction(); - if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { - // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits - List instantsToRollback = getInstantsToRollback( - metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)); - Map> pendingRollbacks = getPendingRollbackInfos(metaClient); - instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - this.rollbackFailedWrites(pendingRollbacks, true); - new UpgradeDowngrade( - metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) - .run(HoodieTableVersion.current(), instantTime); - metaClient.reloadActiveTimeline(); - } - // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, - // if it didn't exist before - // See https://issues.apache.org/jira/browse/HUDI-3343 for more details - initializeMetadataTable(Option.of(instantTime)); - } finally { - this.txnManager.endTransaction(); - } - metaClient.validateTableProperties(config.getProps(), operationType); - return getTableAndInitCtx(metaClient, operationType, instantTime); + protected HoodieTable>, JavaRDD, JavaRDD> doInitTable(HoodieTableMetaClient metaClient, Option instantTime) { + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(instantTime); + + // Create a Hoodie table which encapsulated the commits and files visible + return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); } /** @@ -480,22 +455,6 @@ private void completeTableService(TableServiceType tableServiceType, HoodieCommi } } - private HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx( - HoodieTableMetaClient metaClient, WriteOperationType operationType, String instantTime) { - if (operationType == WriteOperationType.DELETE) { - setWriteSchemaForDeletes(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieSparkTable table = HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient, config.isMetadataTableEnabled()); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeTimer = metrics.getCommitCtx(); - } else { - writeTimer = metrics.getDeltaCommitCtx(); - } - table.getHoodieView().sync(); - return table; - } - @Override protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after startTxn which encapsulated the commits and files visible. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index f51a169dd9b44..552e85af4c66b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -1077,6 +1077,8 @@ private Pair genera writeStat.setPartitionPath(partition); writeStat.setPath(partition + "/" + getBaseFilename(instantTime, newFileId)); writeStat.setFileId(newFileId); + writeStat.setTotalWriteBytes(1); + writeStat.setFileSizeInBytes(1); replaceMetadata.addWriteStat(partition, writeStat); } return Pair.of(requestedReplaceMetadata, replaceMetadata); @@ -1756,6 +1758,8 @@ protected static HoodieCommitMetadata generateCommitMetadata( writeStat.setPartitionPath(partitionPath); writeStat.setPath(partitionPath + "/" + getBaseFilename(instantTime, f)); writeStat.setFileId(f); + writeStat.setTotalWriteBytes(1); + writeStat.setFileSizeInBytes(1); metadata.addWriteStat(partitionPath, writeStat); })); return metadata; diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index f339f5ed910db..71e4b4b4e6e3f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -102,6 +102,7 @@ import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertLinesMatch; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -617,21 +618,7 @@ protected void validateFilesPerPartition(HoodieTestTable testTable, HoodieTableM Collections.sort(fsFileNames); Collections.sort(metadataFilenames); - if ((fsFileNames.size() != metadataFilenames.size()) || (!fsFileNames.equals(metadataFilenames))) { - LOG.info("*** File system listing = " + Arrays.toString(fsFileNames.toArray())); - LOG.info("*** Metadata listing = " + Arrays.toString(metadataFilenames.toArray())); - - for (String fileName : fsFileNames) { - if (!metadataFilenames.contains(fileName)) { - LOG.error(partition + "FsFilename " + fileName + " not found in Meta data"); - } - } - for (String fileName : metadataFilenames) { - if (!fsFileNames.contains(fileName)) { - LOG.error(partition + "Metadata file " + fileName + " not found in original FS"); - } - } - } + assertLinesMatch(fsFileNames, metadataFilenames); assertEquals(fsStatuses.length, partitionToFilesMap.get(partitionPath.toString()).length); // Block sizes should be valid diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java index 1a3d053e23acd..9741ceef3ede3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CollectionUtils.java @@ -77,8 +77,8 @@ public static List combine(List one, List another) { * NOTE: That values associated with overlapping keys from the second map, will override * values from the first one */ - public static Map combine(Map one, Map another) { - Map combined = new HashMap<>(one.size() + another.size()); + public static HashMap combine(Map one, Map another) { + HashMap combined = new HashMap<>(one.size() + another.size()); combined.putAll(one); combined.putAll(another); return combined; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 221b52e77e674..75f83b7c69f26 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -239,10 +240,22 @@ public static HoodieRecord createPartitionFilesRecord(Str Option> filesAdded, Option> filesDeleted) { Map fileInfo = new HashMap<>(); - filesAdded.ifPresent( - m -> m.forEach((filename, size) -> fileInfo.put(filename, new HoodieMetadataFileInfo(size, false)))); - filesDeleted.ifPresent( - m -> m.forEach(filename -> fileInfo.put(filename, new HoodieMetadataFileInfo(0L, true)))); + filesAdded.ifPresent(filesMap -> + fileInfo.putAll( + filesMap.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, (entry) -> { + long fileSize = entry.getValue(); + // Assert that the file-size of the file being added is positive, since Hudi + // should not be creating empty files + checkState(fileSize > 0); + return new HoodieMetadataFileInfo(fileSize, false); + }))) + ); + filesDeleted.ifPresent(filesList -> + fileInfo.putAll( + filesList.stream().collect( + Collectors.toMap(Function.identity(), (ignored) -> new HoodieMetadataFileInfo(0L, true)))) + ); HoodieKey key = new HoodieKey(partition, MetadataPartitionType.FILES.getPartitionPath()); HoodieMetadataPayload payload = new HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo); @@ -288,7 +301,7 @@ public HoodieMetadataPayload preCombine(HoodieMetadataPayload previousRecord) { switch (type) { case METADATA_TYPE_PARTITION_LIST: case METADATA_TYPE_FILE_LIST: - Map combinedFileInfo = combineFilesystemMetadata(previousRecord); + Map combinedFileInfo = combineFileSystemMetadata(previousRecord); return new HoodieMetadataPayload(key, type, combinedFileInfo); case METADATA_TYPE_BLOOM_FILTER: HoodieMetadataBloomFilter combineBloomFilterMetadata = combineBloomFilterMetadata(previousRecord); @@ -392,28 +405,53 @@ private Stream> filterFileInfoEntries( return filesystemMetadata.entrySet().stream().filter(e -> e.getValue().getIsDeleted() == isDeleted); } - private Map combineFilesystemMetadata(HoodieMetadataPayload previousRecord) { + private Map combineFileSystemMetadata(HoodieMetadataPayload previousRecord) { Map combinedFileInfo = new HashMap<>(); + + // First, add all files listed in the previous record if (previousRecord.filesystemMetadata != null) { combinedFileInfo.putAll(previousRecord.filesystemMetadata); } + // Second, merge in the files listed in the new record if (filesystemMetadata != null) { - filesystemMetadata.forEach((filename, fileInfo) -> { - // If the filename wasnt present then we carry it forward - if (!combinedFileInfo.containsKey(filename)) { - combinedFileInfo.put(filename, fileInfo); - } else { - if (fileInfo.getIsDeleted()) { - // file deletion - combinedFileInfo.remove(filename); - } else { - // file appends. - combinedFileInfo.merge(filename, fileInfo, (oldFileInfo, newFileInfo) -> { - return new HoodieMetadataFileInfo(oldFileInfo.getSize() + newFileInfo.getSize(), false); - }); - } - } + validatePayload(type, filesystemMetadata); + + filesystemMetadata.forEach((key, fileInfo) -> { + combinedFileInfo.merge(key, fileInfo, + // Combine previous record w/ the new one, new records taking precedence over + // the old one + // + // NOTE: That if previous listing contains the file that is being deleted by the tombstone + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting + // listing as well as drop the tombstone itself. + // However, if file is not present in the previous record we have to persist tombstone + // record in the listing to make sure we carry forward information that this file + // was deleted. This special case could occur since the merging flow is 2-stage: + // - First we merge records from all of the delta log-files + // - Then we merge records from base-files with the delta ones (coming as a result + // of the previous step) + (oldFileInfo, newFileInfo) -> + // NOTE: We can’t assume that MT update records will be ordered the same way as actual + // FS operations (since they are not atomic), therefore MT record merging should be a + // _commutative_ & _associative_ operation (ie one that would work even in case records + // will get re-ordered), which is + // - Possible for file-sizes (since file-sizes will ever grow, we can simply + // take max of the old and new records) + // - Not possible for is-deleted flags* + // + // *However, we’re assuming that the case of concurrent write and deletion of the same + // file is _impossible_ -- it would only be possible with concurrent upsert and + // rollback operation (affecting the same log-file), which is implausible, b/c either + // of the following have to be true: + // - We’re appending to failed log-file (then the other writer is trying to + // rollback it concurrently, before it’s own write) + // - Rollback (of completed instant) is running concurrently with append (meaning + // that restore is running concurrently with a write, which is also nut supported + // currently) + newFileInfo.getIsDeleted() + ? null + : new HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false)); }); } @@ -509,6 +547,14 @@ public String toString() { return sb.toString(); } + private static void validatePayload(int type, Map filesystemMetadata) { + if (type == METADATA_TYPE_FILE_LIST) { + filesystemMetadata.forEach((fileName, fileInfo) -> { + checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files should have size > 0"); + }); + } + } + private static T getNestedFieldValue(GenericRecord record, String fieldName) { // NOTE: This routine is more lightweight than {@code HoodieAvroUtils.getNestedFieldVal} if (record.getSchema().getField(fieldName) == null) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index e569baefb6f06..8b37d03595169 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -40,6 +40,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -147,40 +148,58 @@ public static Map> convertMetada */ public static List convertMetadataToFilesPartitionRecords(HoodieCommitMetadata commitMetadata, String instantTime) { - List records = new LinkedList<>(); - List allPartitions = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); - - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - LOG.warn("Unable to find path in write stat to update metadata table " + hoodieWriteStat); - return; - } - - int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1; - String filename = pathWithPartition.substring(offset); - long totalWriteBytes = newFiles.containsKey(filename) - ? newFiles.get(filename) + hoodieWriteStat.getTotalWriteBytes() - : hoodieWriteStat.getTotalWriteBytes(); - newFiles.put(filename, totalWriteBytes); - }); - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); - }); + List records = new ArrayList<>(commitMetadata.getPartitionToWriteStats().size()); + + // Add record bearing partitions list + ArrayList partitionsList = new ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()); + + records.add(HoodieMetadataPayload.createPartitionListRecord(partitionsList)); + + // Update files listing records for each individual partition + List> updatedPartitionFilesRecords = + commitMetadata.getPartitionToWriteStats().entrySet() + .stream() + .map(entry -> { + String partitionStatName = entry.getKey(); + List writeStats = entry.getValue(); + + String partition = partitionStatName.equals(EMPTY_PARTITION_NAME) ? NON_PARTITIONED_NAME : partitionStatName; + + HashMap updatedFilesToSizesMapping = + writeStats.stream().reduce(new HashMap<>(writeStats.size()), + (map, stat) -> { + String pathWithPartition = stat.getPath(); + if (pathWithPartition == null) { + // Empty partition + LOG.warn("Unable to find path in write stat to update metadata table " + stat); + return map; + } + + int offset = partition.equals(NON_PARTITIONED_NAME) + ? (pathWithPartition.startsWith("/") ? 1 : 0) + : partition.length() + 1; + String filename = pathWithPartition.substring(offset); + + // Since write-stats are coming in no particular order, if the same + // file have previously been appended to w/in the txn, we simply pick max + // of the sizes as reported after every write, since file-sizes are + // monotonically increasing (ie file-size never goes down, unless deleted) + map.merge(filename, stat.getFileSizeInBytes(), Math::max); + + return map; + }, + CollectionUtils::combine); + + return HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(updatedFilesToSizesMapping), + Option.empty()); + }) + .collect(Collectors.toList()); - // New partitions created - HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); - records.add(record); + records.addAll(updatedPartitionFilesRecords); LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + ". #partitions_updated=" + records.size()); + return records; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 2b1057fea7c19..8f5e5ae964f83 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -285,7 +285,7 @@ public static void createPartitionMetaFile(String basePath, String partitionPath public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId) throws Exception { - createBaseFile(basePath, partitionPath, instantTime, fileId, 0); + createBaseFile(basePath, partitionPath, instantTime, fileId, 1); } public static void createBaseFile(String basePath, String partitionPath, String instantTime, String fileId, long length) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index f78312217eec2..6f49c69960fc1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -1057,6 +1057,7 @@ public static List generateHoodieWriteStatForPartition(Map generateHoodieWriteStatForPartitionLogFiles writeStat.setPartitionPath(partition); writeStat.setPath(partition + "/" + fileName); writeStat.setTotalWriteBytes(fileIdInfo.getValue()[1]); + writeStat.setFileSizeInBytes(fileIdInfo.getValue()[1]); writeStats.add(writeStat); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index d86602ea95c5b..6266c30523092 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -18,6 +18,14 @@ package org.apache.hudi.sink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.FileUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; @@ -30,15 +38,6 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestUtils; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; -import org.apache.flink.util.FileUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -276,6 +275,9 @@ private static WriteMetadataEvent createOperatorEvent( writeStat.setPartitionPath(partitionPath); writeStat.setFileId("fileId123"); writeStat.setPath("path123"); + writeStat.setFileSizeInBytes(123); + writeStat.setTotalWriteBytes(123); + writeStat.setNumWrites(1); writeStatus.setStat(writeStat); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java new file mode 100644 index 0000000000000..20982b5cda688 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/SparkRDDWriteClientOverride.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; + +// Sole purpose of this class is to provide access to otherwise API inaccessible from the tests. +// While it's certainly not a great pattern, it would require substantial test restructuring to +// eliminate such access to an internal API, so this is considered acceptable given it's very limited +// scope (w/in the current package) +class SparkRDDWriteClientOverride extends org.apache.hudi.client.SparkRDDWriteClient { + + public SparkRDDWriteClientOverride(HoodieEngineContext context, HoodieWriteConfig clientConfig) { + super(context, clientConfig); + } + + @Override + public void rollbackFailedBootstrap() { + super.rollbackFailedBootstrap(); + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index d2257f58d0e80..0d1174a1f7182 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -20,7 +20,6 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; @@ -253,7 +252,8 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) .build(); - SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); + + SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); @@ -272,7 +272,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec assertFalse(index.useIndex()); // Run bootstrap again - client = new SparkRDDWriteClient(context, config); + client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); metaClient.reloadActiveTimeline(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 9146cdc4e81f7..a17fd1089dc13 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -20,7 +20,6 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.avro.model.HoodieFileStatus; -import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.bootstrap.BootstrapMode; import org.apache.hudi.client.bootstrap.FullRecordBootstrapDataProvider; import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; @@ -245,7 +244,8 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec .withBootstrapParallelism(3) .withBootstrapModeSelector(bootstrapModeSelectorClass).build()) .build(); - SparkRDDWriteClient client = new SparkRDDWriteClient(context, config); + + SparkRDDWriteClientOverride client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap, numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants); @@ -266,7 +266,7 @@ private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, Effec assertFalse(index.useIndex()); // Run bootstrap again - client = new SparkRDDWriteClient(context, config); + client = new SparkRDDWriteClientOverride(context, config); client.bootstrap(Option.empty()); metaClient.reloadActiveTimeline();