diff --git a/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieWriteClient.java new file mode 100644 index 0000000000000..38ab3bc4ed073 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/AbstractHoodieWriteClient.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import java.util.Collections; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieRollingStat; +import org.apache.hudi.common.model.HoodieRollingStatMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTimeline; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.util.AvroUtils; +import org.apache.hudi.common.util.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; + +import com.codahale.metrics.Timer; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Abstract Write Client providing functionality for performing commit, index updates and rollback + * Reused for regular write operations like upsert/insert/bulk-insert.. as well as bootstrap + * @param Sub type of HoodieRecordPayload + */ +public abstract class AbstractHoodieWriteClient extends AbstractHoodieClient { + + private static final Logger LOG = LogManager.getLogger(AbstractHoodieWriteClient.class); + private static final String UPDATE_STR = "update"; + + private final transient HoodieMetrics metrics; + private final transient HoodieIndex index; + + private transient Timer.Context writeContext = null; + + protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig) { + super(jsc, clientConfig); + this.metrics = new HoodieMetrics(config, config.getTableName()); + this.index = index; + } + + protected AbstractHoodieWriteClient(JavaSparkContext jsc, HoodieIndex index, HoodieWriteConfig clientConfig, + Option timelineServer) { + super(jsc, clientConfig, timelineServer); + this.metrics = new HoodieMetrics(config, config.getTableName()); + this.index = index; + } + + /** + * Commit changes performed at the given commitTime marker. + */ + public boolean commit(String commitTime, JavaRDD writeStatuses) { + return commit(commitTime, writeStatuses, Option.empty()); + } + + /** + * Commit changes performed at the given commitTime marker. + */ + public boolean commit(String commitTime, JavaRDD writeStatuses, + Option> extraMetadata) { + HoodieTableMetaClient metaClient = createMetaClient(false); + return commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType()); + } + + protected JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieTable table, + String commitTime) { + // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future + // RDD actions that are performed after updating the index. + writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel()); + Timer.Context indexTimer = metrics.getIndexCtx(); + // Update the index back + JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); + metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + // Trigger the insert and collect statuses + commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); + return statuses; + } + + protected void commitOnAutoCommit(String commitTime, JavaRDD resultRDD, String actionType) { + if (config.shouldAutoCommit()) { + LOG.info("Auto commit enabled: Committing " + commitTime); + boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType); + if (!commitResult) { + throw new HoodieCommitException("Failed to commit " + commitTime); + } + } else { + LOG.info("Auto commit disabled for " + commitTime); + } + } + + private boolean commit(String commitTime, JavaRDD writeStatuses, + Option> extraMetadata, String actionType) { + + LOG.info("Commiting " + commitTime); + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + + HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); + HoodieCommitMetadata metadata = new HoodieCommitMetadata(); + + List stats = writeStatuses.map(WriteStatus::getStat).collect(); + + updateMetadataAndRollingStats(actionType, metadata, stats); + + // Finalize write + finalizeWrite(table, commitTime, stats); + + // add in extra metadata + if (extraMetadata.isPresent()) { + extraMetadata.get().forEach(metadata::addMetadata); + } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); + + try { + activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + + postCommit(metadata, commitTime, extraMetadata); + + if (writeContext != null) { + long durationInMs = metrics.getDurationInMs(writeContext.stop()); + metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(), durationInMs, + metadata, actionType); + writeContext = null; + } + LOG.info("Committed " + commitTime); + } catch (IOException e) { + throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime, + e); + } catch (ParseException e) { + throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime + + "Instant time is not of valid format", e); + } + return true; + } + + /** + * Post Commit Hook. Derived classes use this method to perform post-commit processing + * @param metadata Commit Metadata corresponding to committed instant + * @param instantTime Instant Time + * @param extraMetadata Additional Metadata passed by user + * @throws IOException in case of error + */ + protected abstract void postCommit(HoodieCommitMetadata metadata, String instantTime, + Option> extraMetadata) throws IOException; + + /** + * Finalize Write operation. + * @param table HoodieTable + * @param instantTime Instant Time + * @param stats Hoodie Write Stat + */ + protected void finalizeWrite(HoodieTable table, String instantTime, List stats) { + try { + final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); + table.finalizeWrite(jsc, instantTime, stats); + if (finalizeCtx != null) { + Option durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); + durationInMs.ifPresent(duration -> { + LOG.info("Finalize write elapsed time (milliseconds): " + duration); + metrics.updateFinalizeWriteMetrics(duration, stats.size()); + }); + } + } catch (HoodieIOException ioe) { + throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe); + } + } + + private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, + List writeStats) { + // TODO : make sure we cannot rollback / archive last commit file + try { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise + // there may be race conditions + HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); + // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there. + // 2. Now, first read the existing rolling stats and merge with the result of current metadata. + + // Need to do this on every commit (delta or commit) to support COW and MOR. + + for (HoodieWriteStat stat : writeStats) { + String partitionPath = stat.getPartitionPath(); + // TODO: why is stat.getPartitionPath() null at times here. + metadata.addWriteStat(partitionPath, stat); + HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), + stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(), + stat.getNumDeletes(), stat.getTotalWriteBytes()); + rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat); + } + // The last rolling stat should be present in the completed timeline + Option lastInstant = + table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + if (lastInstant.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); + Option lastRollingStat = Option + .ofNullable(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)); + if (lastRollingStat.isPresent()) { + rollingStatMetadata = rollingStatMetadata + .merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class)); + } + } + metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString()); + } catch (IOException io) { + throw new HoodieCommitException("Unable to save rolling stats"); + } + } + + public HoodieMetrics getMetrics() { + return metrics; + } + + public HoodieIndex getIndex() { + return index; + } + + protected HoodieTable getTableAndInitCtx(OperationType operationType) { + HoodieTableMetaClient metaClient = createMetaClient(true); + if (operationType == OperationType.DELETE) { + setWriteSchemaFromLastInstant(metaClient); + } + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { + writeContext = metrics.getCommitCtx(); + } else { + writeContext = metrics.getDeltaCommitCtx(); + } + return table; + } + + /** + * Sets write schema from last instant since deletes may not have schema set in the config. + */ + private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) { + try { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option lastInstant = + activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType())) + .lastInstant(); + if (lastInstant.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); + if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) { + config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY)); + } else { + throw new HoodieIOException("Latest commit does not have any schema in commit metadata"); + } + } else { + throw new HoodieIOException("Deletes issued without any prior commits"); + } + } catch (IOException e) { + throw new HoodieIOException("IOException thrown while reading last commit metadata", e); + } + } + + @Override + public void close() { + // Stop timeline-server if running + super.close(); + // Calling this here releases any resources used by your index, so make sure to finish any related operations + // before this point + this.index.close(); + } + + protected void rollbackInternal(String commitToRollback) { + final String startRollbackTime = HoodieActiveTimeline.createNewInstantTime(); + final Timer.Context context = metrics.getRollbackCtx(); + // Create a Hoodie table which encapsulated the commits and files visible + try { + // Create a Hoodie table which encapsulated the commits and files visible + HoodieTable table = HoodieTable.getHoodieTable( + createMetaClient(true), config, jsc); + Option rollbackInstantOpt = + Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)) + .findFirst()); + + if (rollbackInstantOpt.isPresent()) { + List stats = doRollbackAndGetStats(rollbackInstantOpt.get()); + finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime); + } + } catch (IOException e) { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, + e); + } + } + + protected List doRollbackAndGetStats(final HoodieInstant instantToRollback) throws + IOException { + final String commitToRollback = instantToRollback.getTimestamp(); + HoodieTable table = HoodieTable.getHoodieTable( + createMetaClient(true), config, jsc); + HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); + // Check if any of the commits is a savepoint - do not allow rollback on those commits + List savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + savepoints.forEach(s -> { + if (s.contains(commitToRollback)) { + throw new HoodieRollbackException( + "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); + } + }); + + if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { + // nothing to rollback + LOG.info("No commits to rollback " + commitToRollback); + } + + // Make sure only the last n commits are being rolled back + // If there is a commit in-between or after that is not rolled back, then abort + + if ((commitToRollback != null) && !commitTimeline.empty() + && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) { + throw new HoodieRollbackException( + "Found commits after time :" + commitToRollback + ", please rollback greater commits first"); + } + + List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) + .collect(Collectors.toList()); + if ((commitToRollback != null) && !inflights.isEmpty() + && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) { + throw new HoodieRollbackException( + "Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first"); + } + + List stats = table.rollback(jsc, instantToRollback, true); + + LOG.info("Deleted inflight commits " + commitToRollback); + + // cleanup index entries + if (!getIndex().rollbackCommit(commitToRollback)) { + throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); + } + LOG.info("Index rolled back for commits " + commitToRollback); + return stats; + } + + private void finishRollback(final Timer.Context context, List rollbackStats, + List commitsToRollback, final String startRollbackTime) throws IOException { + HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + Option durationInMs = Option.empty(); + long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); + if (context != null) { + durationInMs = Option.of(metrics.getDurationInMs(context.stop())); + metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); + } + HoodieRollbackMetadata rollbackMetadata = AvroUtils + .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats); + //TODO: varadarb - This will be fixed when Rollback transition mimics that of commit + table.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, + startRollbackTime)); + table.getActiveTimeline().saveAsComplete( + new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), + AvroUtils.serializeRollbackMetadata(rollbackMetadata)); + LOG.info("Rollback of Commits " + commitsToRollback + " is complete"); + + if (!table.getActiveTimeline().getCleanerTimeline().empty()) { + LOG.info("Cleaning up older rollback meta files"); + // Cleanup of older cleaner meta files + // TODO - make the commit archival generic and archive rollback metadata + FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), + table.getActiveTimeline().getRollbackTimeline().getInstants()); + } + } + + /** + * Refers to different operation types. + */ + enum OperationType { + INSERT, + INSERT_PREPPED, + UPSERT, + UPSERT_PREPPED, + DELETE, + BULK_INSERT, + BULK_INSERT_PREPPED, + BOOTSTRAP + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index efb6d20115ff8..24cc961a5db0a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -21,7 +21,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; -import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieRollbackStat; @@ -31,8 +30,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieRollingStat; -import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -79,7 +76,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.text.ParseException; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,18 +90,15 @@ *

* Note that, at any given time, there can only be one Spark job performing these operations on a Hoodie dataset. */ -public class HoodieWriteClient extends AbstractHoodieClient { +public class HoodieWriteClient extends AbstractHoodieWriteClient { private static final Logger LOG = LogManager.getLogger(HoodieWriteClient.class); - private static final String UPDATE_STR = "update"; private static final String LOOKUP_STR = "lookup"; private final boolean rollbackPending; private final transient HoodieMetrics metrics; - private final transient HoodieIndex index; private final transient HoodieCleanClient cleanClient; - private transient Timer.Context writeContext = null; private transient Timer.Context compactionTimer; - private transient Timer.Context indexTimer = null; + /** * @@ -128,8 +121,7 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, b public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackPending, HoodieIndex index, Option timelineService) { - super(jsc, clientConfig, timelineService); - this.index = index; + super(jsc, index, clientConfig, timelineService); this.metrics = new HoodieMetrics(config, config.getTableName()); this.rollbackPending = rollbackPending; this.cleanClient = new HoodieCleanClient<>(jsc, config, metrics, timelineService); @@ -149,10 +141,9 @@ public static SparkConf registerClasses(SparkConf conf) { public JavaRDD> filterExists(JavaRDD> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - indexTimer = metrics.getIndexCtx(); - JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table); + Timer.Context indexTimer = metrics.getIndexCtx(); + JavaRDD> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); - indexTimer = null; return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } @@ -166,11 +157,10 @@ public JavaRDD upsert(JavaRDD> records, final Strin JavaRDD> dedupedRecords = combineOnCondition(config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism()); - indexTimer = metrics.getIndexCtx(); + Timer.Context indexTimer = metrics.getIndexCtx(); // perform index loop up to get existing location of records - JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); + JavaRDD> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); - indexTimer = null; return upsertRecordsInternal(taggedRecords, commitTime, table, true); } catch (Throwable e) { if (e instanceof HoodieUpsertException) { @@ -343,14 +333,13 @@ public JavaRDD delete(JavaRDD keys, final String commitT JavaRDD> dedupedRecords = dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); - indexTimer = metrics.getIndexCtx(); + Timer.Context indexTimer = metrics.getIndexCtx(); // perform index loop up to get existing location of records - JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); + JavaRDD> taggedRecords = getIndex().tagLocation(dedupedRecords, jsc, table); // filter out non existant keys/records JavaRDD> taggedValidRecords = taggedRecords.filter(HoodieRecord::isCurrentLocationKnown); if (!taggedValidRecords.isEmpty()) { metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); - indexTimer = null; return upsertRecordsInternal(taggedValidRecords, commitTime, table, true); } else { // if entire set of keys are non existent @@ -397,18 +386,6 @@ private JavaRDD bulkInsertInternal(JavaRDD> deduped return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime); } - private void commitOnAutoCommit(String commitTime, JavaRDD resultRDD, String actionType) { - if (config.shouldAutoCommit()) { - LOG.info("Auto commit enabled: Committing " + commitTime); - boolean commitResult = commit(commitTime, resultRDD, Option.empty(), actionType); - if (!commitResult) { - throw new HoodieCommitException("Failed to commit " + commitTime); - } - } else { - LOG.info("Auto commit disabled for " + commitTime); - } - } - private JavaRDD> combineOnCondition(boolean condition, JavaRDD> records, int parallelism) { return condition ? deduplicateRecords(records, parallelism) : records; @@ -486,102 +463,33 @@ private Partitioner getPartitioner(HoodieTable table, boolean isUpsert, Workload } } - private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD writeStatusRDD, HoodieTable table, - String commitTime) { - // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future - // RDD actions that are performed after updating the index. - writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel()); - indexTimer = metrics.getIndexCtx(); - // Update the index back - JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); - metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); - indexTimer = null; - // Trigger the insert and collect statuses - commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); - return statuses; - } - private JavaRDD> partition(JavaRDD> dedupedRecords, Partitioner partitioner) { return dedupedRecords.mapToPair( record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record)) .partitionBy(partitioner).map(Tuple2::_2); } - /** - * Commit changes performed at the given commitTime marker. - */ - public boolean commit(String commitTime, JavaRDD writeStatuses) { - return commit(commitTime, writeStatuses, Option.empty()); - } - - /** - * Commit changes performed at the given commitTime marker. - */ - public boolean commit(String commitTime, JavaRDD writeStatuses, - Option> extraMetadata) { - HoodieTableMetaClient metaClient = createMetaClient(false); - return commit(commitTime, writeStatuses, extraMetadata, metaClient.getCommitActionType()); - } - - private boolean commit(String commitTime, JavaRDD writeStatuses, - Option> extraMetadata, String actionType) { - - LOG.info("Commiting " + commitTime); - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - - HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); - HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - - List stats = writeStatuses.map(WriteStatus::getStat).collect(); - - updateMetadataAndRollingStats(actionType, metadata, stats); - - // Finalize write - finalizeWrite(table, commitTime, stats); + @Override + protected void postCommit(HoodieCommitMetadata metadata, String instantTime, + Option> extraMetadata) throws IOException { - // add in extra metadata - if (extraMetadata.isPresent()) { - extraMetadata.get().forEach(metadata::addMetadata); + // Do an inline compaction if enabled + if (config.isInlineCompaction()) { + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); + forceCompact(extraMetadata); + } else { + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); } - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); - - try { - activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - // Save was a success & Do a inline compaction if enabled - if (config.isInlineCompaction()) { - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true"); - forceCompact(extraMetadata); - } else { - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false"); - } - - // We cannot have unbounded commit files. Archive commits if we have to archive - HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true)); - archiveLog.archiveIfRequired(jsc); - if (config.isAutoClean()) { - // Call clean to cleanup if there is anything to cleanup after the commit, - LOG.info("Auto cleaning is enabled. Running cleaner now"); - clean(commitTime); - } else { - LOG.info("Auto cleaning is not enabled. Not running cleaner now"); - } - if (writeContext != null) { - long durationInMs = metrics.getDurationInMs(writeContext.stop()); - metrics.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(), durationInMs, - metadata, actionType); - writeContext = null; - } - LOG.info("Committed " + commitTime); - } catch (IOException e) { - throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime, - e); - } catch (ParseException e) { - throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + commitTime - + "Instant time is not of valid format", e); + // We cannot have unbounded commit files. Archive commits if we have to archive + HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config, createMetaClient(true)); + archiveLog.archiveIfRequired(jsc); + if (config.isAutoClean()) { + // Call clean to cleanup if there is anything to cleanup after the commit, + LOG.info("Auto cleaning is enabled. Running cleaner now"); + clean(instantTime); + } else { + LOG.info("Auto cleaning is not enabled. Not running cleaner now"); } - return true; } /** @@ -840,84 +748,6 @@ private Timer.Context startContext() { return metrics.getRollbackCtx(); } - private List doRollbackAndGetStats(final HoodieInstant instantToRollback) throws - IOException { - final String commitToRollback = instantToRollback.getTimestamp(); - HoodieTable table = HoodieTable.getHoodieTable( - createMetaClient(true), config, jsc); - HoodieTimeline inflightAndRequestedCommitTimeline = table.getPendingCommitTimeline(); - HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); - // Check if any of the commits is a savepoint - do not allow rollback on those commits - List savepoints = table.getCompletedSavepointTimeline().getInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - savepoints.forEach(s -> { - if (s.contains(commitToRollback)) { - throw new HoodieRollbackException( - "Could not rollback a savepointed commit. Delete savepoint first before rolling back" + s); - } - }); - - if (commitTimeline.empty() && inflightAndRequestedCommitTimeline.empty()) { - // nothing to rollback - LOG.info("No commits to rollback " + commitToRollback); - } - - // Make sure only the last n commits are being rolled back - // If there is a commit in-between or after that is not rolled back, then abort - - if ((commitToRollback != null) && !commitTimeline.empty() - && !commitTimeline.findInstantsAfter(commitToRollback, Integer.MAX_VALUE).empty()) { - throw new HoodieRollbackException( - "Found commits after time :" + commitToRollback + ", please rollback greater commits first"); - } - - List inflights = inflightAndRequestedCommitTimeline.getInstants().map(HoodieInstant::getTimestamp) - .collect(Collectors.toList()); - if ((commitToRollback != null) && !inflights.isEmpty() && (inflights.indexOf(commitToRollback) != inflights.size() - 1)) { - throw new HoodieRollbackException( - "Found in-flight commits after time :" + commitToRollback + ", please rollback greater commits first"); - } - - List stats = table.rollback(jsc, instantToRollback, true); - - LOG.info("Deleted inflight commits " + commitToRollback); - - // cleanup index entries - if (!index.rollbackCommit(commitToRollback)) { - throw new HoodieRollbackException("Rollback index changes failed, for time :" + commitToRollback); - } - LOG.info("Index rolled back for commits " + commitToRollback); - return stats; - } - - private void finishRollback(final Timer.Context context, List rollbackStats, - List commitsToRollback, final String startRollbackTime) throws IOException { - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - Option durationInMs = Option.empty(); - long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> stat.getSuccessDeleteFiles().size()).sum(); - if (context != null) { - durationInMs = Option.of(metrics.getDurationInMs(context.stop())); - metrics.updateRollbackMetrics(durationInMs.get(), numFilesDeleted); - } - HoodieRollbackMetadata rollbackMetadata = AvroUtils - .convertRollbackMetadata(startRollbackTime, durationInMs, commitsToRollback, rollbackStats); - //TODO: varadarb - This will be fixed in subsequent PR when Rollback and Clean transition mimics that of commit - table.getActiveTimeline().createNewInstant(new HoodieInstant(State.INFLIGHT, HoodieTimeline.ROLLBACK_ACTION, - startRollbackTime)); - table.getActiveTimeline().saveAsComplete( - new HoodieInstant(true, HoodieTimeline.ROLLBACK_ACTION, startRollbackTime), - AvroUtils.serializeRollbackMetadata(rollbackMetadata)); - LOG.info("Commits " + commitsToRollback + " rollback is complete"); - - if (!table.getActiveTimeline().getCleanerTimeline().empty()) { - LOG.info("Cleaning up older rollback meta files"); - // Cleanup of older cleaner meta files - // TODO - make the commit archival generic and archive rollback metadata - FSUtils.deleteOlderRollbackMetaFiles(fs, table.getMetaClient().getMetaPath(), - table.getActiveTimeline().getRollbackTimeline().getInstants()); - } - } - private void finishRestore(final Timer.Context context, Map> commitToStats, List commitsToRollback, final String startRestoreTime, final String restoreToInstant) throws IOException { HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); @@ -946,29 +776,6 @@ private void finishRestore(final Timer.Context context, Map table = HoodieTable.getHoodieTable( - createMetaClient(true), config, jsc); - Option rollbackInstantOpt = - Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.EQUAL.test(instant.getTimestamp(), commitToRollback)) - .findFirst()); - - if (rollbackInstantOpt.isPresent()) { - List stats = doRollbackAndGetStats(rollbackInstantOpt.get()); - finishRollback(context, stats, Collections.singletonList(commitToRollback), startRollbackTime); - } - } catch (IOException e) { - throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitToRollback, - e); - } - } - /** * Releases any resources used by the client. */ @@ -977,9 +784,6 @@ public void close() { // Stop timeline-server if running super.close(); this.cleanClient.close(); - // Calling this here releases any resources used by your index, so make sure to finish any related operations - // before this point - this.index.close(); } /** @@ -1126,7 +930,7 @@ public void commitCompaction(String compactionInstantTime, JavaRDD * Deduplicate Hoodie records, using the given deduplication function. */ JavaRDD> deduplicateRecords(JavaRDD> records, int parallelism) { - boolean isIndexingGlobal = index.isGlobal(); + boolean isIndexingGlobal = getIndex().isGlobal(); return records.mapToPair(record -> { HoodieKey hoodieKey = record.getKey(); // If index used is global, then records are expected to differ in their partitionPath @@ -1146,7 +950,7 @@ JavaRDD> deduplicateRecords(JavaRDD> records, in * Deduplicate Hoodie records, using the given deduplication function. */ JavaRDD deduplicateKeys(JavaRDD keys, int parallelism) { - boolean isIndexingGlobal = index.isGlobal(); + boolean isIndexingGlobal = getIndex().isGlobal(); if (isIndexingGlobal) { return keys.keyBy(HoodieKey::getRecordKey) .reduceByKey((key1, key2) -> key1) @@ -1169,45 +973,6 @@ private void rollbackPendingCommits() { } } - private HoodieTable getTableAndInitCtx(OperationType operationType) { - HoodieTableMetaClient metaClient = createMetaClient(true); - if (operationType == OperationType.DELETE) { - setWriteSchemaFromLastInstant(metaClient); - } - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { - writeContext = metrics.getCommitCtx(); - } else { - writeContext = metrics.getDeltaCommitCtx(); - } - return table; - } - - /** - * Sets write schema from last instant since deletes may not have schema set in the config. - */ - private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) { - try { - HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - Option lastInstant = - activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType())) - .lastInstant(); - if (lastInstant.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); - if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) { - config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY)); - } else { - throw new HoodieIOException("Latest commit does not have any schema in commit metadata"); - } - } else { - throw new HoodieIOException("Deletes issued without any prior commits"); - } - } catch (IOException e) { - throw new HoodieIOException("IOException thrown while reading last commit metadata", e); - } - } /** * Compaction specific private methods */ @@ -1297,22 +1062,6 @@ protected void commitCompaction(JavaRDD compactedStatuses, HoodieTa } } - private void finalizeWrite(HoodieTable table, String instantTime, List stats) { - try { - final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); - table.finalizeWrite(jsc, instantTime, stats); - if (finalizeCtx != null) { - Option durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); - durationInMs.ifPresent(duration -> { - LOG.info("Finalize write elapsed time (milliseconds): " + duration); - metrics.updateFinalizeWriteMetrics(duration, stats.size()); - }); - } - } catch (HoodieIOException ioe) { - throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe); - } - } - /** * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file * @@ -1373,59 +1122,4 @@ private Option forceCompact(Option> extraMetadata) t }); return compactionInstantTimeOpt; } - - private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, - List writeStats) { - // TODO : make sure we cannot rollback / archive last commit file - try { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); - // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise - // there may be race conditions - HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); - // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there. - // 2. Now, first read the existing rolling stats and merge with the result of current metadata. - - // Need to do this on every commit (delta or commit) to support COW and MOR. - - for (HoodieWriteStat stat : writeStats) { - String partitionPath = stat.getPartitionPath(); - // TODO: why is stat.getPartitionPath() null at times here. - metadata.addWriteStat(partitionPath, stat); - HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), - stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(), - stat.getNumDeletes(), stat.getTotalWriteBytes()); - rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat); - } - // The last rolling stat should be present in the completed timeline - Option lastInstant = - table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); - if (lastInstant.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); - Option lastRollingStat = Option - .ofNullable(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)); - if (lastRollingStat.isPresent()) { - rollingStatMetadata = rollingStatMetadata - .merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class)); - } - } - metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString()); - } catch (IOException io) { - throw new HoodieCommitException("Unable to save rolling stats"); - } - } - - /** - * Refers to different operation types. - */ - enum OperationType { - INSERT, - INSERT_PREPPED, - UPSERT, - UPSERT_PREPPED, - DELETE, - BULK_INSERT, - BULK_INSERT_PREPPED - } -} +} \ No newline at end of file