From 195a76d8e8369bce14d09ac2b46bfb293eb858ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=BB=E5=85=86=E9=9D=96?= Date: Wed, 21 Sep 2022 10:51:29 +0800 Subject: [PATCH] [HUDI-4148] Preparations and client for hudi table management service --- .../hudi/client/BaseHoodieWriteClient.java | 386 +--------------- .../hudi/client/BaseTableServiceClient.java | 432 ++++++++++++++++++ .../hudi/client/CommonHoodieClient.java | 310 +++++++++++++ .../apache/hudi/client/RunsTableService.java | 9 + .../manager/HoodieTableManagerClient.java | 191 ++++++++ .../apache/hudi/config/HoodieWriteConfig.java | 14 + .../action/clean/CleanPlanActionExecutor.java | 17 +- .../cluster/ClusteringPlanActionExecutor.java | 12 + .../ScheduleCompactionActionExecutor.java | 17 +- .../client/HoodieFlinkTableServiceClient.java | 219 +++++++++ .../hudi/client/HoodieFlinkWriteClient.java | 142 +----- .../client/HoodieSparkClusteringClient.java | 2 +- .../client/SparkRDDTableServiceClient.java | 216 +++++++++ .../hudi/client/SparkRDDWriteClient.java | 149 +----- .../config/HoodieTableManagerConfig.java | 195 ++++++++ .../hudi/configuration/FlinkOptions.java | 7 + .../org/apache/hudi/util/StreamerUtil.java | 1 + .../procedures/RunClusteringProcedure.scala | 2 +- .../hudi/utilities/HoodieClusteringJob.java | 4 +- .../utilities/deltastreamer/DeltaSync.java | 2 +- 20 files changed, 1678 insertions(+), 649 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CommonHoodieClient.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java create mode 100644 hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 8c8cf67d618b9..3981d2fa84d33 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -22,25 +22,21 @@ import org.apache.hudi.async.AsyncCleanerService; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; -import org.apache.hudi.avro.model.HoodieCleanerPlan; -import org.apache.hudi.avro.model.HoodieClusteringPlan; -import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPlan; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.callback.HoodieWriteCommitCallback; import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage; import org.apache.hudi.callback.util.HoodieCommitCallbackFactory; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.heartbeat.HeartbeatUtils; -import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; @@ -56,14 +52,11 @@ import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CleanerUtils; -import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; @@ -89,7 +82,6 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.rollback.RollbackUtils; import org.apache.hudi.table.action.savepoint.SavepointHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade; @@ -107,12 +99,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY; @@ -125,7 +114,7 @@ * @param Type of keys * @param Type of outputs */ -public abstract class BaseHoodieWriteClient extends BaseHoodieClient +public abstract class BaseHoodieWriteClient extends CommonHoodieClient implements RunsTableService { protected static final String LOOKUP_STR = "lookup"; @@ -134,17 +123,9 @@ public abstract class BaseHoodieWriteClient index; private final SupportsUpgradeDowngrade upgradeDowngradeHelper; - private transient WriteOperationType operationType; private transient HoodieWriteCommitCallback commitCallback; - protected final transient HoodieMetrics metrics; - protected transient Timer.Context writeTimer = null; - protected transient Timer.Context compactionTimer; - protected transient Timer.Context clusteringTimer; - - protected transient AsyncCleanerService asyncCleanerService; - protected transient AsyncArchiveService asyncArchiveService; - protected final TransactionManager txnManager; + protected BaseTableServiceClient tableServiceClient; protected Option>> lastCompletedTxnAndMetadata = Option.empty(); protected Set pendingInflightAndRequestedInstants; @@ -174,22 +155,12 @@ public BaseHoodieWriteClient(HoodieEngineContext context, Option timelineService, SupportsUpgradeDowngrade upgradeDowngradeHelper) { super(context, writeConfig, timelineService); - this.metrics = new HoodieMetrics(config); this.index = createIndex(writeConfig); - this.txnManager = new TransactionManager(config, fs); this.upgradeDowngradeHelper = upgradeDowngradeHelper; } protected abstract HoodieIndex createIndex(HoodieWriteConfig writeConfig); - public void setOperationType(WriteOperationType operationType) { - this.operationType = operationType; - } - - public WriteOperationType getOperationType() { - return this.operationType; - } - /** * Commit changes performed at the given instantTime marker. */ @@ -360,23 +331,6 @@ public void bootstrap(Option> extraMetadata) { table.bootstrap(context, extraMetadata); } - /** - * Main API to rollback failed bootstrap. - */ - protected void rollbackFailedBootstrap() { - LOG.info("Rolling back pending bootstrap if present"); - HoodieTable table = createTable(config, hadoopConf); - HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); - Option instant = Option.fromJavaOptional( - inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); - if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS, - HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { - LOG.info("Found pending bootstrap instants. Rolling them back"); - table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime()); - LOG.info("Finished rolling back pending bootstrap"); - } - } - /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * @@ -541,62 +495,12 @@ protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata me if (!tableServicesEnabled(config)) { return; } - if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) { - if (config.isMetadataTableEnabled()) { - table.getHoodieView().sync(); - } - // Do an inline compaction if enabled - if (config.inlineCompactionEnabled()) { - runAnyPendingCompactions(table); - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); - inlineCompaction(extraMetadata); - } else { - metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"); - } - - // if just inline schedule is enabled - if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction() - && !table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().findAny().isPresent()) { - // proceed only if there are no pending compactions - metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true"); - inlineScheduleCompaction(extraMetadata); - } - // Do an inline clustering if enabled - if (config.inlineClusteringEnabled()) { - runAnyPendingClustering(table); - metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true"); - inlineClustering(extraMetadata); - } else { - metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false"); - } - - // if just inline schedule is enabled - if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering() - && !table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().findAny().isPresent()) { - // proceed only if there are no pending clustering - metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true"); - inlineScheduleClustering(extraMetadata); - } + if (!config.areAnyTableServicesExecutedInline() && !config.areAnyTableServicesScheduledInline()) { + return; } - } - protected void runAnyPendingCompactions(HoodieTable table) { - table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants() - .forEach(instant -> { - LOG.info("Running previously failed inflight compaction at instant " + instant); - compact(instant.getTimestamp(), true); - }); - } - - protected void runAnyPendingClustering(HoodieTable table) { - table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> { - Option> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant); - if (instantPlan.isPresent()) { - LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft()); - cluster(instant.getTimestamp(), true); - } - }); + tableServiceClient.runTableServicesInline(table, metadata, extraMetadata); } protected void autoCleanOnCommit() { @@ -634,7 +538,7 @@ protected void autoArchiveOnCommit(HoodieTable table, boolean acquireLockForArch * Run any pending compactions. */ public void runAnyPendingCompactions() { - runAnyPendingCompactions(createTable(config, hadoopConf)); + tableServiceClient.runAnyPendingCompactions(createTable(config, hadoopConf)); } /** @@ -731,61 +635,6 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc return rollback(commitInstantTime, pendingRollbackInfo, false); } - /** - * @Deprecated - * Rollback the inflight record changes with the given commit time. This - * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean) - * - * @param commitInstantTime Instant time of the commit - * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. - * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. - * @throws HoodieRollbackException if rollback cannot be performed successfully - */ - @Deprecated - public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { - LOG.info("Begin rollback of instant " + commitInstantTime); - final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); - final Timer.Context timerContext = this.metrics.getRollbackCtx(); - try { - HoodieTable table = createTable(config, hadoopConf); - Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() - .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) - .findFirst()); - if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) { - LOG.info(String.format("Scheduling Rollback at instant time : %s " - + "(exists in active timeline: %s), with rollback plan: %s", - rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent())); - Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) - .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); - if (rollbackPlanOption.isPresent()) { - // There can be a case where the inflight rollback failed after the instant files - // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is - // not present in the timeline. In such a case, the hoodie instant instance - // is reconstructed to allow the rollback to be reattempted, and the deleteInstants - // is set to false since they are already deleted. - // Execute rollback - HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() - ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) - : table.rollback(context, rollbackInstantTime, new HoodieInstant( - true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), - false, skipLocking); - if (timerContext != null) { - long durationInMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); - } - return true; - } else { - throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); - } - } else { - LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); - return false; - } - } catch (Exception e) { - throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e); - } - } - /** * NOTE : This action requires all writers (ingest and compact) to a table to be stopped before proceeding. Revert * the (inflight/committed) record changes for all commits after the provided instant time. @@ -850,33 +699,7 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) t * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. */ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException { - if (!tableServicesEnabled(config)) { - return null; - } - final Timer.Context timerContext = metrics.getCleanCtx(); - CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), - HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); - - HoodieTable table = createTable(config, hadoopConf); - if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) { - LOG.info("Cleaner started"); - // proceed only if multiple clean schedules are enabled or if there are no pending cleans. - if (scheduleInline) { - scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); - table.getMetaClient().reloadActiveTimeline(); - } - } - - // Proceeds to execute any requested or inflight clean instances in the timeline - HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime, skipLocking); - if (timerContext != null && metadata != null) { - long durationMs = metrics.getDurationInMs(timerContext.stop()); - metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); - LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" - + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() - + " cleanerElapsedMs" + durationMs); - } - return metadata; + return tableServiceClient.clean(cleanInstantTime, scheduleInline, skipLocking); } public HoodieCleanMetadata clean() { @@ -1052,6 +875,19 @@ public void dropIndex(List partitionTypes) { } } + /** + * Performs Clustering for the workload stored in instant-time. + * + * @param clusteringInstantTime Clustering Instant Time + * @return Collection of WriteStatus to inspect errors and counts + */ + public HoodieWriteMetadata cluster(String clusteringInstantTime) { + if (delegateToTableManagerService(config, ActionType.replacecommit)) { + throw new HoodieException(ActionType.replacecommit.name() + " delegate to table management service!"); + } + return cluster(clusteringInstantTime, true); + } + /** * Performs Compaction for the workload stored in instant-time. * @@ -1059,6 +895,9 @@ public void dropIndex(List partitionTypes) { * @return Collection of WriteStatus to inspect errors and counts */ public HoodieWriteMetadata compact(String compactionInstantTime) { + if (delegateToTableManagerService(config, ActionType.compaction)) { + throw new HoodieException(ActionType.compaction.name() + " delegate to table management service!"); + } return compact(compactionInstantTime, config.shouldAutoCommit()); } @@ -1077,152 +916,6 @@ public abstract void commitCompaction(String compactionInstantTime, HoodieCommit */ protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime); - /** - * Get inflight time line exclude compaction and clustering. - * @param metaClient - * @return - */ - private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) { - HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction(); - HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> { - if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - Option> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant); - return !instantPlan.isPresent(); - } else { - return true; - } - }); - return inflightTimelineExcludeClusteringCommit; - } - - protected Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { - return getPendingRollbackInfo(metaClient, commitToRollback, true); - } - - public Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { - return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty()); - } - - protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { - return getPendingRollbackInfos(metaClient, true); - } - - /** - * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}. - * @param metaClient instance of {@link HoodieTableMetaClient} to use. - * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair. - */ - protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) { - List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList()); - Map> infoMap = new HashMap<>(); - for (HoodieInstant rollbackInstant : instants) { - HoodieRollbackPlan rollbackPlan; - try { - rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant); - } catch (Exception e) { - if (rollbackInstant.isRequested()) { - LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e); - try { - metaClient.getActiveTimeline().deletePending(rollbackInstant); - } catch (HoodieIOException he) { - LOG.warn("Cannot delete " + rollbackInstant, he); - continue; - } - } else { - // Here we assume that if the rollback is inflight, the rollback plan is intact - // in instant.rollback.requested. The exception here can be due to other reasons. - LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e); - } - continue; - } - - try { - String action = rollbackPlan.getInstantToRollback().getAction(); - if (ignoreCompactionAndClusteringInstants) { - if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) { - boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action) - && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(), - rollbackPlan.getInstantToRollback().getCommitTime())).isPresent(); - if (!isClustering) { - String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime(); - infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); - } - } - } else { - infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); - } - } catch (Exception e) { - LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e); - } - } - return infoMap; - } - - /** - * Rollback all failed writes. - */ - protected Boolean rollbackFailedWrites() { - return rollbackFailedWrites(false); - } - - /** - * Rollback all failed writes. - * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. - */ - protected Boolean rollbackFailedWrites(boolean skipLocking) { - HoodieTable table = createTable(config, hadoopConf); - List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); - Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); - instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); - rollbackFailedWrites(pendingRollbacks, skipLocking); - return true; - } - - protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking) { - // sort in reverse order of commit times - LinkedHashMap> reverseSortedRollbackInstants = instantsToRollback.entrySet() - .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); - for (Map.Entry> entry : reverseSortedRollbackInstants.entrySet()) { - if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, - HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { - // do we need to handle failed rollback of a bootstrap - rollbackFailedBootstrap(); - HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); - break; - } else { - rollback(entry.getKey(), entry.getValue(), skipLocking); - HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); - } - } - } - - protected List getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) { - Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient) - .getReverseOrderedInstants(); - if (cleaningPolicy.isEager()) { - return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { - if (curInstantTime.isPresent()) { - return !entry.equals(curInstantTime.get()); - } else { - return true; - } - }).collect(Collectors.toList()); - } else if (cleaningPolicy.isLazy()) { - return inflightInstantsStream.filter(instant -> { - try { - return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); - } catch (IOException io) { - throw new HoodieException("Failed to check heartbeat for instant " + instant, io); - } - }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); - } else if (cleaningPolicy.isNever()) { - return Collections.EMPTY_LIST; - } else { - throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); - } - } - /** * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time. * @@ -1323,41 +1016,12 @@ public Option scheduleTableService(String instantTime, Option scheduleTableServiceInternal(String instantTime, Option> extraMetadata, - TableServiceType tableServiceType) { - if (!tableServicesEnabled(config)) { - return Option.empty(); - } - switch (tableServiceType) { - case ARCHIVE: - LOG.info("Scheduling archiving is not supported. Skipping."); - return Option.empty(); - case CLUSTER: - LOG.info("Scheduling clustering at instant time :" + instantTime); - Option clusteringPlan = createTable(config, hadoopConf) - .scheduleClustering(context, instantTime, extraMetadata); - return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty(); - case COMPACT: - LOG.info("Scheduling compaction at instant time :" + instantTime); - Option compactionPlan = createTable(config, hadoopConf) - .scheduleCompaction(context, instantTime, extraMetadata); - return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); - case CLEAN: - LOG.info("Scheduling cleaning at instant time :" + instantTime); - Option cleanerPlan = createTable(config, hadoopConf) - .scheduleCleaning(context, instantTime, extraMetadata); - return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty(); - default: - throw new IllegalArgumentException("Invalid TableService " + tableServiceType); - } - } - /** * Executes a clustering plan on a table, serially before or after an insert/upsert action. * Schedules and executes clustering inline. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java new file mode 100644 index 0000000000000..c090d6d6729f9 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.TableServiceType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Map; + +public abstract class BaseTableServiceClient extends CommonHoodieClient { + + private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class); + + protected BaseTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { + super(context, clientConfig, Option.empty()); + } + + protected boolean tableServicesEnabled(HoodieWriteConfig config) { + boolean enabled = config.areTableServicesEnabled(); + if (!enabled) { + LOG.warn(String.format("Table services are disabled. Set `%s` to enable.", HoodieWriteConfig.TABLE_SERVICES_ENABLED)); + } + return enabled; + } + + protected boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) { + boolean supportsAction = config.getTableManagerConfig().isTableManagerSupportsAction(actionType); + if (supportsAction) { + LOG.warn(actionType.name() + " delegate to table manager service!"); + } + return supportsAction; + } + + /** + * Performs a compaction operation on a table, serially before or after an insert/upsert action. + * Scheduling and execution is done inline. + */ + protected Option inlineCompaction(Option> extraMetadata) { + Option compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata); + compactionInstantTimeOpt.ifPresent(compactInstantTime -> { + // inline compaction should auto commit as the user is never given control + compact(compactInstantTime, true); + }); + return compactionInstantTimeOpt; + } + + private void inlineCompaction(HoodieTable table, Option> extraMetadata) { + if (delegateToTableManagerService(config, ActionType.compaction)) { + scheduleCompaction(extraMetadata); + } else { + runAnyPendingCompactions(table); + inlineCompaction(extraMetadata); + } + } + + protected void runAnyPendingCompactions(HoodieTable table) { + table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants() + .forEach(instant -> { + LOG.info("Running previously failed inflight compaction at instant " + instant); + compact(instant.getTimestamp(), true); + }); + } + + /*** + * Schedules compaction inline. + * @param extraMetadata extrametada to be used. + * @return compaction instant if scheduled. + */ + protected Option inlineScheduleCompaction(Option> extraMetadata) { + return scheduleCompaction(extraMetadata); + } + + /** + * Schedules a new compaction instant. + * + * @param extraMetadata Extra Metadata to be stored + */ + public Option scheduleCompaction(Option> extraMetadata) throws HoodieIOException { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); + } + + /** + * Ensures compaction instant is in expected state and performs Compaction for the workload stored in instant-time. + * + * @param compactionInstantTime Compaction Instant Time + * @return Collection of Write Status + */ + protected abstract HoodieWriteMetadata compact(String compactionInstantTime, boolean shouldComplete); + + /** + * Commit a compaction operation. Allow passing additional meta-data to be stored in commit instant file. + * + * @param compactionInstantTime Compaction Instant Time + * @param metadata All the metadata that gets stored along with a commit + * @param extraMetadata Extra Metadata to be stored + */ + public abstract void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, + Option> extraMetadata); + + /** + * Commit Compaction and track metrics. + */ + protected abstract void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime); + + /** + * Schedules a new compaction instant with passed-in instant time. + * + * @param instantTime Compaction Instant Time + * @param extraMetadata Extra Metadata to be stored + */ + public boolean scheduleCompactionAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { + return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent(); + } + + /** + * Schedules a new clustering instant. + * + * @param extraMetadata Extra Metadata to be stored + */ + public Option scheduleClustering(Option> extraMetadata) throws HoodieIOException { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + return scheduleClusteringAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); + } + + /** + * Schedules a new clustering instant with passed-in instant time. + * + * @param instantTime clustering Instant Time + * @param extraMetadata Extra Metadata to be stored + */ + public boolean scheduleClusteringAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { + return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLUSTER).isPresent(); + } + + /** + * Schedules a new cleaning instant. + * + * @param extraMetadata Extra Metadata to be stored + */ + protected Option scheduleCleaning(Option> extraMetadata) throws HoodieIOException { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + return scheduleCleaningAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty(); + } + + /** + * Schedules a new cleaning instant with passed-in instant time. + * + * @param instantTime cleaning Instant Time + * @param extraMetadata Extra Metadata to be stored + */ + protected boolean scheduleCleaningAtInstant(String instantTime, Option> extraMetadata) throws HoodieIOException { + return scheduleTableService(instantTime, extraMetadata, TableServiceType.CLEAN).isPresent(); + } + + /** + * Ensures clustering instant is in expected state and performs clustering for the plan stored in metadata. + * + * @param clusteringInstant Clustering Instant Time + * @return Collection of Write Status + */ + public abstract HoodieWriteMetadata cluster(String clusteringInstant, boolean shouldComplete); + + protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option> extraMetadata) { + if (config.isMetadataTableEnabled()) { + table.getHoodieView().sync(); + } + // Do an inline compaction if enabled + if (config.inlineCompactionEnabled()) { + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + inlineCompaction(table, extraMetadata); + } else { + metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"); + } + + // if just inline schedule is enabled + if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction() + && !table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants().findAny().isPresent()) { + // proceed only if there are no pending compactions + metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true"); + inlineScheduleCompaction(extraMetadata); + } + + // Do an inline clustering if enabled + if (config.inlineClusteringEnabled()) { + metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true"); + inlineClustering(table, extraMetadata); + } else { + metadata.addMetadata(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "false"); + } + + // if just inline schedule is enabled + if (!config.inlineClusteringEnabled() && config.scheduleInlineClustering() + && !table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().findAny().isPresent()) { + // proceed only if there are no pending clustering + metadata.addMetadata(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), "true"); + inlineScheduleClustering(extraMetadata); + } + } + + /** + * Schedule table services such as clustering, compaction & cleaning. + * + * @param extraMetadata Metadata to pass onto the scheduled service instant + * @param tableServiceType Type of table service to schedule + * @return + */ + public Option scheduleTableService(Option> extraMetadata, TableServiceType tableServiceType) { + String instantTime = HoodieActiveTimeline.createNewInstantTime(); + return scheduleTableService(instantTime, extraMetadata, tableServiceType); + } + + /** + * Schedule table services such as clustering, compaction & cleaning. + * + * @param extraMetadata Metadata to pass onto the scheduled service instant + * @param tableServiceType Type of table service to schedule + * @return + */ + public Option scheduleTableService(String instantTime, Option> extraMetadata, + TableServiceType tableServiceType) { + // A lock is required to guard against race conditions between an on-going writer and scheduling a table service. + final Option inflightInstant = Option.of(new HoodieInstant(HoodieInstant.State.REQUESTED, + tableServiceType.getAction(), instantTime)); + try { + this.txnManager.beginTransaction(inflightInstant, Option.empty()); + LOG.info("Scheduling table service " + tableServiceType); + return scheduleTableServiceInternal(instantTime, extraMetadata, tableServiceType); + } finally { + this.txnManager.endTransaction(inflightInstant); + } + } + + protected Option scheduleTableServiceInternal(String instantTime, Option> extraMetadata, + TableServiceType tableServiceType) { + if (!tableServicesEnabled(config)) { + return Option.empty(); + } + switch (tableServiceType) { + case ARCHIVE: + LOG.info("Scheduling archiving is not supported. Skipping."); + return Option.empty(); + case CLUSTER: + LOG.info("Scheduling clustering at instant time :" + instantTime); + Option clusteringPlan = createTable(config, hadoopConf) + .scheduleClustering(context, instantTime, extraMetadata); + return clusteringPlan.isPresent() ? Option.of(instantTime) : Option.empty(); + case COMPACT: + LOG.info("Scheduling compaction at instant time :" + instantTime); + Option compactionPlan = createTable(config, hadoopConf) + .scheduleCompaction(context, instantTime, extraMetadata); + return compactionPlan.isPresent() ? Option.of(instantTime) : Option.empty(); + case CLEAN: + LOG.info("Scheduling cleaning at instant time :" + instantTime); + Option cleanerPlan = createTable(config, hadoopConf) + .scheduleCleaning(context, instantTime, extraMetadata); + return cleanerPlan.isPresent() ? Option.of(instantTime) : Option.empty(); + default: + throw new IllegalArgumentException("Invalid TableService " + tableServiceType); + } + } + + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); + + /** + * Executes a clustering plan on a table, serially before or after an insert/upsert action. + * Schedules and executes clustering inline. + */ + protected Option inlineClustering(Option> extraMetadata) { + Option clusteringInstantOpt = inlineScheduleClustering(extraMetadata); + clusteringInstantOpt.ifPresent(clusteringInstant -> { + // inline cluster should auto commit as the user is never given control + cluster(clusteringInstant, true); + }); + return clusteringInstantOpt; + } + + private void inlineClustering(HoodieTable table, Option> extraMetadata) { + if (delegateToTableManagerService(config, ActionType.replacecommit)) { + scheduleClustering(extraMetadata); + } else { + runAnyPendingClustering(table); + inlineClustering(extraMetadata); + } + } + + /** + * Schedules clustering inline. + * + * @param extraMetadata extrametadata to use. + * @return clustering instant if scheduled. + */ + protected Option inlineScheduleClustering(Option> extraMetadata) { + return scheduleClustering(extraMetadata); + } + + protected void runAnyPendingClustering(HoodieTable table) { + table.getActiveTimeline().filterPendingReplaceTimeline().getInstants().forEach(instant -> { + Option> instantPlan = ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant); + if (instantPlan.isPresent()) { + LOG.info("Running pending clustering at instant " + instantPlan.get().getLeft()); + cluster(instant.getTimestamp(), true); + } + }); + } + + /** + * Finalize Write operation. + * + * @param table HoodieTable + * @param instantTime Instant Time + * @param stats Hoodie Write Stat + */ + protected void finalizeWrite(HoodieTable table, String instantTime, List stats) { + try { + final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); + table.finalizeWrite(context, instantTime, stats); + if (finalizeCtx != null) { + Option durationInMs = Option.of(metrics.getDurationInMs(finalizeCtx.stop())); + durationInMs.ifPresent(duration -> { + LOG.info("Finalize write elapsed time (milliseconds): " + duration); + metrics.updateFinalizeWriteMetrics(duration, stats.size()); + }); + } + } catch (HoodieIOException ioe) { + throw new HoodieCommitException("Failed to complete commit " + instantTime + " due to finalize errors.", ioe); + } + } + + /** + * Write the HoodieCommitMetadata to metadata table if available. + * + * @param table {@link HoodieTable} of interest. + * @param instantTime instant time of the commit. + * @param actionType action type of the commit. + * @param metadata instance of {@link HoodieCommitMetadata}. + */ + protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { + context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); + table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, + table.isTableServiceAction(actionType))); + } + + /** + * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the + * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be + * cleaned). This API provides the flexibility to schedule clean instant asynchronously via + * {@link BaseTableServiceClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling + * of clean. + * + * @param cleanInstantTime instant time for clean. + * @param scheduleInline true if needs to be scheduled inline. false otherwise. + * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. + */ + public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException { + if (!tableServicesEnabled(config)) { + return null; + } + final Timer.Context timerContext = metrics.getCleanCtx(); + CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(), + HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking)); + + HoodieCleanMetadata metadata = null; + HoodieTable table = createTable(config, hadoopConf); + if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) { + LOG.info("Cleaner started"); + // proceed only if multiple clean schedules are enabled or if there are no pending cleans. + if (scheduleInline) { + scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN); + table.getMetaClient().reloadActiveTimeline(); + } + + if (delegateToTableManagerService(config, ActionType.clean)) { + return null; + } + metadata = table.clean(context, cleanInstantTime, skipLocking); + if (timerContext != null && metadata != null) { + long durationMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); + LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files" + + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain() + + " cleanerElapsedMs" + durationMs); + } + } + return metadata; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CommonHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CommonHoodieClient.java new file mode 100644 index 0000000000000..3942cb440aab6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CommonHoodieClient.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.async.AsyncArchiveService; +import org.apache.hudi.async.AsyncCleanerService; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieRollbackPlan; +import org.apache.hudi.client.embedded.EmbeddedTimelineService; +import org.apache.hudi.client.heartbeat.HeartbeatUtils; +import org.apache.hudi.client.transaction.TransactionManager; +import org.apache.hudi.common.HoodiePendingRollbackInfo; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.rollback.RollbackUtils; + +import com.codahale.metrics.Timer; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class CommonHoodieClient extends BaseHoodieClient { + + private static final Logger LOG = LogManager.getLogger(CommonHoodieClient.class); + + protected final transient HoodieMetrics metrics; + protected transient Timer.Context writeTimer = null; + protected transient Timer.Context compactionTimer; + protected transient Timer.Context clusteringTimer; + + protected transient WriteOperationType operationType; + + protected transient AsyncCleanerService asyncCleanerService; + protected transient AsyncArchiveService asyncArchiveService; + protected final TransactionManager txnManager; + + protected CommonHoodieClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option timelineServer) { + super(context, clientConfig, timelineServer); + this.metrics = new HoodieMetrics(config); + this.txnManager = new TransactionManager(config, fs); + } + + public void setOperationType(WriteOperationType operationType) { + this.operationType = operationType; + } + + public WriteOperationType getOperationType() { + return this.operationType; + } + + /** + * Get inflight time line exclude compaction and clustering. + * @param metaClient + * @return + */ + private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) { + HoodieTimeline inflightTimelineWithReplaceCommit = metaClient.getCommitsTimeline().filterPendingExcludingCompaction(); + HoodieTimeline inflightTimelineExcludeClusteringCommit = inflightTimelineWithReplaceCommit.filter(instant -> { + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant); + return !instantPlan.isPresent(); + } else { + return true; + } + }); + return inflightTimelineExcludeClusteringCommit; + } + + protected Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback) { + return getPendingRollbackInfo(metaClient, commitToRollback, true); + } + + public Option getPendingRollbackInfo(HoodieTableMetaClient metaClient, String commitToRollback, boolean ignoreCompactionAndClusteringInstants) { + return getPendingRollbackInfos(metaClient, ignoreCompactionAndClusteringInstants).getOrDefault(commitToRollback, Option.empty()); + } + + protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient) { + return getPendingRollbackInfos(metaClient, true); + } + + /** + * Fetch map of pending commits to be rolled-back to {@link HoodiePendingRollbackInfo}. + * @param metaClient instance of {@link HoodieTableMetaClient} to use. + * @return map of pending commits to be rolled-back instants to Rollback Instant and Rollback plan Pair. + */ + protected Map> getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean ignoreCompactionAndClusteringInstants) { + List instants = metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList()); + Map> infoMap = new HashMap<>(); + for (HoodieInstant rollbackInstant : instants) { + HoodieRollbackPlan rollbackPlan; + try { + rollbackPlan = RollbackUtils.getRollbackPlan(metaClient, rollbackInstant); + } catch (Exception e) { + if (rollbackInstant.isRequested()) { + LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", deleting the plan since it's in REQUESTED state", e); + try { + metaClient.getActiveTimeline().deletePending(rollbackInstant); + } catch (HoodieIOException he) { + LOG.warn("Cannot delete " + rollbackInstant, he); + continue; + } + } else { + // Here we assume that if the rollback is inflight, the rollback plan is intact + // in instant.rollback.requested. The exception here can be due to other reasons. + LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ", skip the plan", e); + } + continue; + } + + try { + String action = rollbackPlan.getInstantToRollback().getAction(); + if (ignoreCompactionAndClusteringInstants) { + if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) { + boolean isClustering = HoodieTimeline.REPLACE_COMMIT_ACTION.equals(action) + && ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(true, rollbackPlan.getInstantToRollback().getAction(), + rollbackPlan.getInstantToRollback().getCommitTime())).isPresent(); + if (!isClustering) { + String instantToRollback = rollbackPlan.getInstantToRollback().getCommitTime(); + infoMap.putIfAbsent(instantToRollback, Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); + } + } + } else { + infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(), Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan))); + } + } catch (Exception e) { + LOG.warn("Processing rollback plan failed for " + rollbackInstant + ", skip the plan", e); + } + } + return infoMap; + } + + /** + * Rollback all failed writes. + */ + protected Boolean rollbackFailedWrites() { + return rollbackFailedWrites(false); + } + + /** + * Rollback all failed writes. + * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. + */ + protected Boolean rollbackFailedWrites(boolean skipLocking) { + HoodieTable table = createTable(config, hadoopConf); + List instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(), Option.empty()); + Map> pendingRollbacks = getPendingRollbackInfos(table.getMetaClient()); + instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, Option.empty())); + rollbackFailedWrites(pendingRollbacks, skipLocking); + return true; + } + + protected void rollbackFailedWrites(Map> instantsToRollback, boolean skipLocking) { + // sort in reverse order of commit times + LinkedHashMap> reverseSortedRollbackInstants = instantsToRollback.entrySet() + .stream().sorted((i1, i2) -> i2.getKey().compareTo(i1.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); + for (Map.Entry> entry : reverseSortedRollbackInstants.entrySet()) { + if (HoodieTimeline.compareTimestamps(entry.getKey(), HoodieTimeline.LESSER_THAN_OR_EQUALS, + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { + // do we need to handle failed rollback of a bootstrap + rollbackFailedBootstrap(); + HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); + break; + } else { + rollback(entry.getKey(), entry.getValue(), skipLocking); + HeartbeatUtils.deleteHeartbeatFile(fs, basePath, entry.getKey(), config); + } + } + } + + protected List getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option curInstantTime) { + Stream inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient) + .getReverseOrderedInstants(); + if (cleaningPolicy.isEager()) { + return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> { + if (curInstantTime.isPresent()) { + return !entry.equals(curInstantTime.get()); + } else { + return true; + } + }).collect(Collectors.toList()); + } else if (cleaningPolicy.isLazy()) { + return inflightInstantsStream.filter(instant -> { + try { + return heartbeatClient.isHeartbeatExpired(instant.getTimestamp()); + } catch (IOException io) { + throw new HoodieException("Failed to check heartbeat for instant " + instant, io); + } + }).map(HoodieInstant::getTimestamp).collect(Collectors.toList()); + } else if (cleaningPolicy.isNever()) { + return Collections.EMPTY_LIST; + } else { + throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy()); + } + } + + /** + * @Deprecated + * Rollback the inflight record changes with the given commit time. This + * will be removed in future in favor of {@link BaseHoodieWriteClient#restoreToInstant(String, boolean) + * + * @param commitInstantTime Instant time of the commit + * @param pendingRollbackInfo pending rollback instant and plan if rollback failed from previous attempt. + * @param skipLocking if this is triggered by another parent transaction, locking can be skipped. + * @throws HoodieRollbackException if rollback cannot be performed successfully + */ + @Deprecated + public boolean rollback(final String commitInstantTime, Option pendingRollbackInfo, boolean skipLocking) throws HoodieRollbackException { + LOG.info("Begin rollback of instant " + commitInstantTime); + final String rollbackInstantTime = pendingRollbackInfo.map(entry -> entry.getRollbackInstant().getTimestamp()).orElse(HoodieActiveTimeline.createNewInstantTime()); + final Timer.Context timerContext = this.metrics.getRollbackCtx(); + try { + HoodieTable table = createTable(config, hadoopConf); + Option commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() + .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) + .findFirst()); + if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) { + LOG.info(String.format("Scheduling Rollback at instant time : %s " + + "(exists in active timeline: %s), with rollback plan: %s", + rollbackInstantTime, commitInstantOpt.isPresent(), pendingRollbackInfo.isPresent())); + Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) + .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); + if (rollbackPlanOption.isPresent()) { + // There can be a case where the inflight rollback failed after the instant files + // are deleted for commitInstantTime, so that commitInstantOpt is empty as it is + // not present in the timeline. In such a case, the hoodie instant instance + // is reconstructed to allow the rollback to be reattempted, and the deleteInstants + // is set to false since they are already deleted. + // Execute rollback + HoodieRollbackMetadata rollbackMetadata = commitInstantOpt.isPresent() + ? table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, skipLocking) + : table.rollback(context, rollbackInstantTime, new HoodieInstant( + true, rollbackPlanOption.get().getInstantToRollback().getAction(), commitInstantTime), + false, skipLocking); + if (timerContext != null) { + long durationInMs = metrics.getDurationInMs(timerContext.stop()); + metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted()); + } + return true; + } else { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime); + } + } else { + LOG.warn("Cannot find instant " + commitInstantTime + " in the timeline, for rollback"); + return false; + } + } catch (Exception e) { + throw new HoodieRollbackException("Failed to rollback " + config.getBasePath() + " commits " + commitInstantTime, e); + } + } + + /** + * Main API to rollback failed bootstrap. + */ + protected void rollbackFailedBootstrap() { + LOG.info("Rolling back pending bootstrap if present"); + HoodieTable table = createTable(config, hadoopConf); + HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); + Option instant = Option.fromJavaOptional( + inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); + if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS, + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) { + LOG.info("Found pending bootstrap instants. Rolling them back"); + table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime()); + LOG.info("Finished rolling back pending bootstrap"); + } + } + + protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java index 64e540568e8dc..f7553458171ac 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.log4j.LogManager; @@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig config) { } return enabled; } + + default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) { + boolean supportsAction = config.getTableManagerConfig().isTableManagerSupportsAction(actionType); + if (supportsAction) { + LOG.warn(actionType.name() + " delegate to table manager service!"); + } + return supportsAction; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java new file mode 100644 index 0000000000000..388247dee58c5 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.table.manager; + +import org.apache.hudi.common.config.HoodieTableManagerConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.ClusteringUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieRemoteException; + +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; +import org.apache.http.client.utils.URIBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Client which send the table service instants to the table management service. + */ +public class HoodieTableManagerClient { + + private static final String BASE_URL = "/v1/hoodie/service"; + + public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register"); + + public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute"); + public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete"); + + public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute"); + public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete"); + + public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute"); + public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete"); + + public static final String DATABASE_NAME_PARAM = "db_name"; + public static final String TABLE_NAME_PARAM = "table_name"; + public static final String BASEPATH_PARAM = "basepath"; + public static final String INSTANT_PARAM = "instant"; + public static final String USERNAME = "username"; + public static final String CLUSTER = "cluster"; + public static final String QUEUE = "queue"; + public static final String RESOURCE = "resource"; + public static final String PARALLELISM = "parallelism"; + public static final String EXTRA_PARAMS = "extra_params"; + public static final String EXECUTION_ENGINE = "execution_engine"; + + private final HoodieTableManagerConfig config; + private final HoodieTableMetaClient metaClient; + private final String host; + private final int port; + private final String basePath; + private final String dbName; + private final String tableName; + + private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class); + + public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) { + this.basePath = metaClient.getBasePathV2().toString(); + this.dbName = metaClient.getTableConfig().getDatabaseName(); + this.tableName = metaClient.getTableConfig().getTableName(); + this.host = config.getTableManagerHost(); + this.port = config.getTableManagerPort(); + this.config = config; + this.metaClient = metaClient; + } + + private String executeRequest(String requestPath, Map queryParameters) throws IOException { + URIBuilder builder = + new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http"); + queryParameters.forEach(builder::addParameter); + + String url = builder.toString(); + LOG.info("Sending request to table management service : (" + url + ")"); + Response response; + int timeout = this.config.getConnectionTimeout() * 1000; // msec + int requestRetryLimit = config.getConnectionRetryLimit(); + int retry = 0; + + while (retry < requestRetryLimit) { + try { + response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute(); + return response.returnContent().asString(); + } catch (IOException e) { + retry++; + LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e); + if (requestRetryLimit == retry) { + throw e; + } + } + + try { + TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay()); + } catch (InterruptedException e) { + // ignore + } + } + + throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit)); + } + + private Map getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) { + Map paramsMap = new HashMap<>(); + paramsMap.put(BASEPATH_PARAM, basePath); + ValidationUtils.checkArgument(paramNames.length == paramVals.length); + for (int i = 0; i < paramNames.length; i++) { + paramsMap.put(paramNames[i], paramVals[i]); + } + return paramsMap; + } + + public void register() { + try { + executeRequest(REGISTER_ENDPOINT, getDefaultParams(null)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void executeCompaction() { + try { + String instantRange = StringUtils.join(metaClient.reloadActiveTimeline() + .filterPendingCompactionTimeline() + .getInstants() + .map(HoodieInstant::getTimestamp) + .toArray(String[]::new), ","); + + executeRequest(EXECUTE_COMPACTION, getDefaultParams(instantRange)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void executeClean() { + try { + String instantRange = StringUtils.join(metaClient.reloadActiveTimeline() + .getCleanerTimeline() + .filterInflightsAndRequested() + .getInstants() + .map(HoodieInstant::getTimestamp) + .toArray(String[]::new), ","); + + executeRequest(EXECUTE_CLEAN, getDefaultParams(instantRange)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + public void executeClustering() { + try { + metaClient.reloadActiveTimeline(); + String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient) + .stream() + .map(HoodieInstant::getTimestamp) + .toArray(String[]::new), ","); + + executeRequest(EXECUTE_CLUSTERING, getDefaultParams(instantRange)); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + private Map getDefaultParams(String instantRange) { + return getParamsWithAdditionalParams( + new String[] {DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, USERNAME, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS, EXECUTION_ENGINE}, + new String[] {dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(), config.getDeployResource(), + String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams(), config.getDeployExecutionEngine()}); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 78c8d677523a2..2f30675142ff3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetastoreConfig; +import org.apache.hudi.common.config.HoodieTableManagerConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.ConsistencyGuardConfig; @@ -497,6 +498,7 @@ public class HoodieWriteConfig extends HoodieConfig { private HoodiePayloadConfig hoodiePayloadConfig; private HoodieMetadataConfig metadataConfig; private HoodieMetastoreConfig metastoreConfig; + private HoodieTableManagerConfig tableManagerConfig; private HoodieCommonConfig commonConfig; private EngineType engineType; @@ -889,6 +891,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) { this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build(); this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build(); + this.tableManagerConfig = HoodieTableManagerConfig.newBuilder().fromProperties(props).build(); this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build(); } @@ -1948,6 +1951,10 @@ public HoodieMetadataConfig getMetadataConfig() { return metadataConfig; } + public HoodieTableManagerConfig getTableManagerConfig() { + return tableManagerConfig; + } + public HoodieCommonConfig getCommonConfig() { return commonConfig; } @@ -2160,6 +2167,13 @@ public boolean isMetastoreEnabled() { return metastoreConfig.enableMetastore(); } + /** + * Table Manager configs. + */ + public boolean isTableManagerEnabled() { + return tableManagerConfig.enableTableManager(); + } + public static class Builder { protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 7f3b437178fd4..40b40806d8e3c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -22,7 +22,9 @@ import org.apache.hudi.avro.model.HoodieCleanFileInfo; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.client.table.manager.HoodieTableManagerClient; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.CleanFileInfo; import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -146,6 +148,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { */ protected Option requestClean(String startCleanTime) { final HoodieCleanerPlan cleanerPlan = requestClean(context); + Option option = Option.empty(); if ((cleanerPlan.getFilePathsToBeDeletedPerPartition() != null) && !cleanerPlan.getFilePathsToBeDeletedPerPartition().isEmpty() && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) { @@ -159,9 +162,19 @@ protected Option requestClean(String startCleanTime) { LOG.error("Got exception when saving cleaner requested file", e); throw new HoodieIOException(e.getMessage(), e); } - return Option.of(cleanerPlan); + option = Option.of(cleanerPlan); } - return Option.empty(); + + if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.clean)) { + delegateCleanExecutionToTableManager(); + } + + return option; + } + + private void delegateCleanExecutionToTableManager() { + HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig()); + tableManagerClient.executeClean(); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java index e0e02bae8e14c..8a52fc700b9f6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/ClusteringPlanActionExecutor.java @@ -20,7 +20,9 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.client.table.manager.HoodieTableManagerClient; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -105,6 +107,16 @@ public Option execute() { throw new HoodieIOException("Exception scheduling clustering", ioe); } } + + if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.replacecommit)) { + delegateClusteringExecutionToTableManager(); + } + return planOption; } + + private void delegateClusteringExecutionToTableManager() { + HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig()); + tableManagerClient.executeClustering(); + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 4fb5f9f7ddba5..0fa01096c8e9e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -19,8 +19,10 @@ package org.apache.hudi.table.action.compact; import org.apache.hudi.avro.model.HoodieCompactionPlan; +import org.apache.hudi.client.table.manager.HoodieTableManagerClient; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -91,6 +93,7 @@ public Option execute() { } HoodieCompactionPlan plan = scheduleCompaction(); + Option option = Option.empty(); if (plan != null && (plan.getOperations() != null) && (!plan.getOperations().isEmpty())) { extraMetadata.ifPresent(plan::setExtraMetadata); HoodieInstant compactionInstant = @@ -101,9 +104,14 @@ public Option execute() { } catch (IOException ioe) { throw new HoodieIOException("Exception scheduling compaction", ioe); } - return Option.of(plan); + option = Option.of(plan); } - return Option.empty(); + + if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.compaction)) { + delegateCompactionExecutionToTableManager(); + } + + return option; } private HoodieCompactionPlan scheduleCompaction() { @@ -217,4 +225,9 @@ private Long parsedToSeconds(String time) { } return timestamp; } + + private void delegateCompactionExecutionToTableManager() { + HoodieTableManagerClient tableManagerClient = new HoodieTableManagerClient(table.getMetaClient(), config.getTableManagerConfig()); + tableManagerClient.executeCompaction(); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java new file mode 100644 index 0000000000000..191d402e2b87f --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hudi.util.FlinkClientUtil; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HoodieFlinkTableServiceClient extends BaseTableServiceClient> { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkTableServiceClient.class); + + /** + * Cached metadata writer for coordinator to reuse for each commit. + */ + private HoodieBackedTableMetadataWriter metadataWriter; + + protected HoodieFlinkTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { + super(context, clientConfig, metrics); + } + + @Override + protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { + // only used for metadata table, the compaction happens in single thread + HoodieWriteMetadata> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime); + commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); + return compactionMetadata; + } + + @Override + public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option> extraMetadata) { + extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); + completeCompaction(metadata, getHoodieTable(), compactionInstantTime); + } + + @Override + protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); + List writeStats = metadata.getWriteStats(); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); + try { + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata); + LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(Option.of(compactionInstant)); + } + WriteMarkersFactory + .get(config.getMarkersType(), table, compactionCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + if (compactionTimer != null) { + long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + compactionCommitTime, e); + } + } + LOG.info("Compacted successfully on commit " + compactionCommitTime); + } + + protected void completeClustering( + HoodieReplaceCommitMetadata metadata, + HoodieTable>, List, List> table, + String clusteringCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering"); + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> + e.getValue().stream()).collect(Collectors.toList()); + if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) { + throw new HoodieClusteringException("Clustering failed to write to files:" + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","))); + } + + try { + this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + finalizeWrite(table, clusteringCommitTime, writeStats); + // commit to data table after committing to metadata table. + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata); + LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); + table.getActiveTimeline().transitionReplaceInflightToComplete( + HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (IOException e) { + throw new HoodieClusteringException( + "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e); + } finally { + this.txnManager.endTransaction(Option.of(clusteringInstant)); + } + + WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + if (clusteringTimer != null) { + long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); + try { + metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + + config.getBasePath() + " at time " + clusteringCommitTime, e); + } + } + LOG.info("Clustering successfully on commit " + clusteringCommitTime); + } + + @Override + public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { + return null; + } + + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { + return null; + } + + public HoodieFlinkTable getHoodieTable() { + return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context); + } + + public void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { + if (this.metadataWriter == null) { + initMetadataWriter(); + } + try { + // guard the metadata writer with concurrent lock + this.txnManager.getLockManager().lock(); + + // refresh the timeline + + // Note: the data meta client is not refreshed currently, some code path + // relies on the meta client for resolving the latest data schema, + // the schema expects to be immutable for SQL jobs but may be not for non-SQL + // jobs. + this.metadataWriter.initTableMetadata(); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + } finally { + this.txnManager.getLockManager().unlock(); + } + } + + /** + * Initialize the table metadata writer, for e.g, bootstrap the metadata table + * from the filesystem if it does not exist. + */ + public void initMetadataWriter() { + this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( + FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); + } + + public void initMetadataTable() { + HoodieFlinkTable table = getHoodieTable(); + if (config.isMetadataTableEnabled()) { + // initialize the metadata table path + initMetadataWriter(); + // clean the obsolete index stats + table.deleteMetadataIndexIfNecessary(); + } else { + // delete the metadata table if it was enabled but is now disabled + table.maybeDeleteMetadataTable(); + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 53a5799508470..02bd5c2df6c7c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -36,13 +36,8 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.index.FlinkHoodieIndexFactory; import org.apache.hudi.index.HoodieIndex; @@ -54,26 +49,19 @@ import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.MiniBatchHandle; -import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; -import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper; import org.apache.hudi.table.upgrade.UpgradeDowngrade; -import org.apache.hudi.util.FlinkClientUtil; import com.codahale.metrics.Timer; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.text.ParseException; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -101,14 +89,10 @@ public class HoodieFlinkWriteClient extends */ private final Map> bucketToHandles; - /** - * Cached metadata writer for coordinator to reuse for each commit. - */ - private HoodieBackedTableMetadataWriter metadataWriter; - public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance()); this.bucketToHandles = new HashMap<>(); + this.tableServiceClient = new HoodieFlinkTableServiceClient<>(context, writeConfig, getMetrics()); } /** @@ -270,51 +254,8 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType, // remove the async cleaning } - @Override - protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { - if (this.metadataWriter == null) { - initMetadataWriter(); - } - try { - // guard the metadata writer with concurrent lock - this.txnManager.getLockManager().lock(); - - // refresh the timeline - - // Note: the data meta client is not refreshed currently, some code path - // relies on the meta client for resolving the latest data schema, - // the schema expects to be immutable for SQL jobs but may be not for non-SQL - // jobs. - this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); - } finally { - this.txnManager.getLockManager().unlock(); - } - } - - /** - * Initialize the table metadata writer, for e.g, bootstrap the metadata table - * from the filesystem if it does not exist. - */ - public void initMetadataWriter() { - this.metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( - FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); - } - - /** - * Initialized the metadata table on start up, should only be called once on driver. - */ public void initMetadataTable() { - HoodieFlinkTable table = getHoodieTable(); - if (config.isMetadataTableEnabled()) { - // initialize the metadata table path - initMetadataWriter(); - // clean the obsolete index stats - table.deleteMetadataIndexIfNecessary(); - } else { - // delete the metadata table if it was enabled but is now disabled - table.maybeDeleteMetadataTable(); - } + ((HoodieFlinkTableServiceClient) tableServiceClient).initMetadataTable(); } /** @@ -390,9 +331,7 @@ public void commitCompaction( String compactionInstantTime, HoodieCommitMetadata metadata, Option> extraMetadata) { - HoodieFlinkTable table = getHoodieTable(); - extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); - completeCompaction(metadata, table, compactionInstantTime); + tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata); } @Override @@ -400,43 +339,13 @@ public void completeCompaction( HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); - List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); - try { - this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); - finalizeWrite(table, compactionCommitTime, writeStats); - // commit to data table after committing to metadata table. - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata); - LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); - } finally { - this.txnManager.endTransaction(Option.of(compactionInstant)); - } - WriteMarkersFactory - .get(config.getMarkersType(), table, compactionCommitTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (compactionTimer != null) { - long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); - try { - metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " - + config.getBasePath() + " at time " + compactionCommitTime, e); - } - } - LOG.info("Compacted successfully on commit " + compactionCommitTime); + tableServiceClient.completeCompaction(metadata, table, compactionCommitTime); } @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { // only used for metadata table, the compaction happens in single thread - HoodieWriteMetadata> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime); - commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty()); - return compactionMetadata; + return tableServiceClient.compact(compactionInstantTime, shouldComplete); } @Override @@ -448,46 +357,7 @@ private void completeClustering( HoodieReplaceCommitMetadata metadata, HoodieTable>, List, List> table, String clusteringCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering"); - HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime); - List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> - e.getValue().stream()).collect(Collectors.toList()); - if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) { - throw new HoodieClusteringException("Clustering failed to write to files:" - + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(","))); - } - - try { - this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); - finalizeWrite(table, clusteringCommitTime, writeStats); - // commit to data table after committing to metadata table. - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata); - LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); - table.getActiveTimeline().transitionReplaceInflightToComplete( - HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (IOException e) { - throw new HoodieClusteringException( - "Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e); - } finally { - this.txnManager.endTransaction(Option.of(clusteringInstant)); - } - - WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (clusteringTimer != null) { - long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); - try { - metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " - + config.getBasePath() + " at time " + clusteringCommitTime, e); - } - } - LOG.info("Clustering successfully on commit " + clusteringCommitTime); + ((HoodieFlinkTableServiceClient) tableServiceClient).completeClustering(metadata, table, clusteringCommitTime); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java index 0812b366aadac..ce627abb8fc76 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkClusteringClient.java @@ -51,7 +51,7 @@ public HoodieSparkClusteringClient( public void cluster(HoodieInstant instant) throws IOException { LOG.info("Executing clustering instance " + instant); SparkRDDWriteClient writeClient = (SparkRDDWriteClient) clusteringClient; - Option commitMetadata = writeClient.cluster(instant.getTimestamp(), true).getCommitMetadata(); + Option commitMetadata = writeClient.cluster(instant.getTimestamp()).getCommitMetadata(); Stream hoodieWriteStatStream = commitMetadata.get().getPartitionToWriteStats().entrySet().stream().flatMap(e -> e.getValue().stream()); long errorsCount = hoodieWriteStatStream.mapToLong(HoodieWriteStat::getTotalWriteErrors).sum(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java new file mode 100644 index 0000000000000..881248446ea94 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.TableServiceType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.compact.CompactHelpers; +import org.apache.hudi.table.marker.WriteMarkersFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.api.java.JavaRDD; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class SparkRDDTableServiceClient extends BaseTableServiceClient> { + + private static final Logger LOG = LoggerFactory.getLogger(SparkRDDTableServiceClient.class); + + protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, HoodieMetrics metrics) { + super(context, clientConfig, metrics); + } + + @Override + protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { + HoodieSparkTable table = HoodieSparkTable.create(config, context); + HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); + HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); + if (pendingCompactionTimeline.containsInstant(inflightInstant)) { + table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + } + compactionTimer = metrics.getCompactionCtx(); + HoodieWriteMetadata> writeMetadata = table.compact(context, compactionInstantTime); + HoodieWriteMetadata> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); + if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { + completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime); + } + return compactionMetadata; + } + + @Override + public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option> extraMetadata) { + HoodieSparkTable table = HoodieSparkTable.create(config, context); + extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); + completeCompaction(metadata, table, compactionInstantTime); + } + + @Override + protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { + this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); + List writeStats = metadata.getWriteStats(); + final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); + try { + this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); + finalizeWrite(table, compactionCommitTime, writeStats); + // commit to data table after committing to metadata table. + updateTableMetadata(table, metadata, compactionInstant); + LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); + CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); + } finally { + this.txnManager.endTransaction(Option.of(compactionInstant)); + } + WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + if (compactionTimer != null) { + long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); + HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION) + ); + } + LOG.info("Compacted successfully on commit " + compactionCommitTime); + } + + @Override + public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { + HoodieSparkTable table = HoodieSparkTable.create(config, context); + HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); + HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); + if (pendingClusteringTimeline.containsInstant(inflightInstant)) { + table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); + table.getMetaClient().reloadActiveTimeline(); + } + clusteringTimer = metrics.getClusteringCtx(); + LOG.info("Starting clustering at " + clusteringInstant); + HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); + HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); + // TODO : Where is shouldComplete used ? + if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { + completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); + } + return clusteringMetadata; + } + + // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy + private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, + HoodieTable table, + String commitInstant) { + + switch (tableServiceType) { + case CLUSTER: + completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant); + break; + case COMPACT: + completeCompaction(metadata, table, commitInstant); + break; + default: + throw new IllegalArgumentException("This table service is not valid " + tableServiceType); + } + } + + private void completeClustering(HoodieReplaceCommitMetadata metadata, + HoodieTable table, + String clusteringCommitTime) { + List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> + e.getValue().stream()).collect(Collectors.toList()); + + if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { + throw new HoodieClusteringException("Clustering failed to write to files:" + + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); + } + + final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); + try { + this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); + + finalizeWrite(table, clusteringCommitTime, writeStats); + // Update table's metadata (table) + updateTableMetadata(table, metadata, clusteringInstant); + + LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); + + table.getActiveTimeline().transitionReplaceInflightToComplete( + clusteringInstant, + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } catch (Exception e) { + throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); + } finally { + this.txnManager.endTransaction(Option.of(clusteringInstant)); + } + WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) + .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); + if (clusteringTimer != null) { + long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); + HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) + ); + } + LOG.info("Clustering successfully on commit " + clusteringCommitTime); + } + + private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, + HoodieInstant hoodieInstant) { + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter(hoodieInstant.getTimestamp()) + .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); + } + + /** + * Initialize the metadata table if needed. Creating the metadata table writer + * will trigger the initial bootstrapping from the data table. + * + * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization + */ + protected void initializeMetadataTable(Option inFlightInstantTimestamp) { + if (config.isMetadataTableEnabled()) { + SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, + context, Option.empty(), inFlightInstantTimestamp); + } + } + + @Override + protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) { + return HoodieSparkTable.create(config, context); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a142fd80d4bf8..2fc6f0c900631 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -29,29 +29,21 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; -import org.apache.hudi.exception.HoodieClusteringException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; -import org.apache.hudi.metadata.HoodieTableMetadataWriter; -import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import org.apache.hudi.table.action.compact.CompactHelpers; -import org.apache.hudi.table.marker.WriteMarkersFactory; import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; import com.codahale.metrics.Timer; @@ -62,10 +54,8 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @SuppressWarnings("checkstyle:LineLength") public class SparkRDDWriteClient extends @@ -91,6 +81,7 @@ public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeC public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, Option timelineService) { super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance()); + this.tableServiceClient = new SparkRDDTableServiceClient<>(context, writeConfig, getMetrics()); } /** @@ -290,126 +281,28 @@ protected JavaRDD postWrite(HoodieWriteMetadata> extraMetadata) { - HoodieSparkTable table = HoodieSparkTable.create(config, context); - extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); - completeCompaction(metadata, table, compactionInstantTime); + tableServiceClient.commitCompaction(compactionInstantTime, metadata, extraMetadata); } @Override protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) { - this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName()); - List writeStats = metadata.getWriteStats(); - final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime); - try { - this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); - finalizeWrite(table, compactionCommitTime, writeStats); - // commit to data table after committing to metadata table. - updateTableMetadata(table, metadata, compactionInstant); - LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata); - CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); - } finally { - this.txnManager.endTransaction(Option.of(compactionInstant)); - } - WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (compactionTimer != null) { - long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); - HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> - metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION) - ); - } - LOG.info("Compacted successfully on commit " + compactionCommitTime); + tableServiceClient.completeCompaction(metadata, table, compactionCommitTime); } @Override protected HoodieWriteMetadata> compact(String compactionInstantTime, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(compactionInstantTime, WriteOperationType.COMPACT, table.getMetaClient()); - HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); - HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); - if (pendingCompactionTimeline.containsInstant(inflightInstant)) { - table.rollbackInflightCompaction(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); - table.getMetaClient().reloadActiveTimeline(); - } - compactionTimer = metrics.getCompactionCtx(); - HoodieWriteMetadata> writeMetadata = table.compact(context, compactionInstantTime); - HoodieWriteMetadata> compactionMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); - if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) { - completeTableService(TableServiceType.COMPACT, compactionMetadata.getCommitMetadata().get(), table, compactionInstantTime); - } - return compactionMetadata; + return tableServiceClient.compact(compactionInstantTime, shouldComplete); } @Override public HoodieWriteMetadata> cluster(String clusteringInstant, boolean shouldComplete) { HoodieSparkTable table = HoodieSparkTable.create(config, context); preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient()); - HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline(); - HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant); - if (pendingClusteringTimeline.containsInstant(inflightInstant)) { - table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false)); - table.getMetaClient().reloadActiveTimeline(); - } - clusteringTimer = metrics.getClusteringCtx(); - LOG.info("Starting clustering at " + clusteringInstant); - HoodieWriteMetadata> writeMetadata = table.cluster(context, clusteringInstant); - HoodieWriteMetadata> clusteringMetadata = writeMetadata.clone(HoodieJavaRDD.getJavaRDD(writeMetadata.getWriteStatuses())); - // TODO : Where is shouldComplete used ? - if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { - completeTableService(TableServiceType.CLUSTER, clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); - } - return clusteringMetadata; - } - - private void completeClustering(HoodieReplaceCommitMetadata metadata, - HoodieTable table, - String clusteringCommitTime) { - List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> - e.getValue().stream()).collect(Collectors.toList()); - - if (writeStats.stream().mapToLong(s -> s.getTotalWriteErrors()).sum() > 0) { - throw new HoodieClusteringException("Clustering failed to write to files:" - + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(","))); - } - - final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime); - try { - this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty()); - - finalizeWrite(table, clusteringCommitTime, writeStats); - // Update table's metadata (table) - updateTableMetadata(table, metadata, clusteringInstant); - - LOG.info("Committing Clustering " + clusteringCommitTime + ". Finished with result " + metadata); - - table.getActiveTimeline().transitionReplaceInflightToComplete( - clusteringInstant, - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); - } catch (Exception e) { - throw new HoodieClusteringException("unable to transition clustering inflight to complete: " + clusteringCommitTime, e); - } finally { - this.txnManager.endTransaction(Option.of(clusteringInstant)); - } - WriteMarkersFactory.get(config.getMarkersType(), table, clusteringCommitTime) - .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - if (clusteringTimer != null) { - long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); - HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> - metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) - ); - } - LOG.info("Clustering successfully on commit " + clusteringCommitTime); - } - - private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, - HoodieInstant hoodieInstant) { - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); - // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a - // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - table.getMetadataWriter(hoodieInstant.getTimestamp()) - .ifPresent(writer -> ((HoodieTableMetadataWriter) writer).update(commitMetadata, hoodieInstant.getTimestamp(), isTableServiceAction)); + return tableServiceClient.cluster(clusteringInstant, shouldComplete); } @Override @@ -418,43 +311,13 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option inFlightInstantTimestamp) { - if (config.isMetadataTableEnabled()) { - SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, - context, Option.empty(), inFlightInstantTimestamp); - } - } - - // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy - private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, - HoodieTable table, - String commitInstant) { - - switch (tableServiceType) { - case CLUSTER: - completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant); - break; - case COMPACT: - completeCompaction(metadata, table, commitInstant); - break; - default: - throw new IllegalArgumentException("This table service is not valid " + tableServiceType); - } - } - @Override protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) { // Create a Hoodie table after startTxn which encapsulated the commits and files visible. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java new file mode 100644 index 0000000000000..ab571c58bde96 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.config; + +import org.apache.hudi.common.model.ActionType; + +import javax.annotation.concurrent.Immutable; + +import java.util.Properties; + +/** + * Configurations used by the HUDI Table Management Service. + */ +@Immutable +@ConfigClassProperty(name = "Table Management Service Configs", + groupName = ConfigGroups.Names.WRITE_CLIENT, + description = "Configurations used by the Hudi Table Management Service.") +public class HoodieTableManagerConfig extends HoodieConfig { + + public static final String TABLE_MANAGEMENT_SERVICE_PREFIX = "hoodie.table.management.service"; + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_ENABLE = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".enable") + .defaultValue(false) + .withDocumentation("Use metastore server to store hoodie table metadata"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_HOST = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".host") + .defaultValue("localhost") + .withDocumentation("Table management service host"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_PORT = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".port") + .defaultValue(26755) + .withDocumentation("Table management service port"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_ACTIONS = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".actions") + .defaultValue("") + .withDocumentation("Which action deploy on table management service such as compaction:clean, default null"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_USERNAME = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".deploy.username") + .defaultValue("default") + .withDocumentation("The user name to deploy for table service of this table"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_QUEUE = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".deploy.queue") + .defaultValue("default") + .withDocumentation("The queue to deploy for table service of this table"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_RESOURCE = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".deploy.resource") + .defaultValue("4g:4g") + .withDocumentation("The resource to deploy for table service of this table, default driver 4g, executor 4g"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_PARALLELISM = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".deploy.parallelism") + .defaultValue(100) + .withDocumentation("The max parallelism to deploy for table service of this table, default 100"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_EXECUTION_ENGINE = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".execution.engine") + .defaultValue("spark") + .withDocumentation("The execution engine to deploy for table service of this table, default spark"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_DEPLOY_EXTRA_PARAMS = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".deploy.extra.params") + .defaultValue("") + .withDocumentation("The extra params to deploy for table service of this table, split by ';'"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_TIMEOUT = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".timeout") + .defaultValue(300) + .withDocumentation("Connection timeout for client"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_RETRIES = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".connect.retries") + .defaultValue(3) + .withDocumentation("Number of retries while opening a connection to table management service"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_RETRY_DELAY = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".connect.retry.delay") + .defaultValue(1) + .withDocumentation("Number of seconds for the client to wait between consecutive connection attempts"); + + public static final ConfigProperty TABLE_MANAGEMENT_SERVICE_TOLERABLE_NUM = ConfigProperty + .key(TABLE_MANAGEMENT_SERVICE_PREFIX + ".tolerable.num") + .defaultValue(0) + .withDocumentation("Number of connection to table management service unsuccessful tolerable for the client"); + + public static HoodieTableManagerConfig.Builder newBuilder() { + return new HoodieTableManagerConfig.Builder(); + } + + public boolean enableTableManager() { + return getBoolean(TABLE_MANAGEMENT_SERVICE_ENABLE); + } + + public String getTableManagerHost() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_HOST); + } + + public Integer getTableManagerPort() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_PORT); + } + + public String getTableManagerActions() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_ACTIONS); + } + + public String getDeployUsername() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_USERNAME); + } + + public String getDeployQueue() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_QUEUE); + } + + public String getDeployResource() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_RESOURCE); + } + + public int getDeployParallelism() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_PARALLELISM); + } + + public String getDeployExtraParams() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_EXTRA_PARAMS); + } + + public String getDeployExecutionEngine() { + return getStringOrDefault(TABLE_MANAGEMENT_SERVICE_DEPLOY_EXECUTION_ENGINE); + } + + public int getConnectionTimeout() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_TIMEOUT); + } + + public int getConnectionRetryLimit() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_RETRIES); + } + + public int getConnectionRetryDelay() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_RETRY_DELAY); + } + + public int getConnectionTolerableNum() { + return getIntOrDefault(TABLE_MANAGEMENT_SERVICE_TOLERABLE_NUM); + } + + public boolean isTableManagerSupportsAction(ActionType actionType) { + return enableTableManager() && getTableManagerActions().contains(actionType.name()); + } + + public static class Builder { + private final HoodieTableManagerConfig config = new HoodieTableManagerConfig(); + + public Builder fromProperties(Properties props) { + this.config.getProps().putAll(props); + return this; + } + + public Builder withHost(String host) { + config.setValue(TABLE_MANAGEMENT_SERVICE_HOST, host); + return this; + } + + public Builder withPort(String port) { + config.setValue(TABLE_MANAGEMENT_SERVICE_PORT, port); + return this; + } + + public HoodieTableManagerConfig build() { + config.setDefaults(HoodieTableManagerConfig.class.getName()); + return config; + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java index a9e10d3e55eb1..f8f7044297dfd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -83,6 +84,12 @@ private FlinkOptions() { // Common Options // ------------------------------------------------------------------------ + public static final ConfigOption DATABASE_NAME = ConfigOptions + .key(HoodieTableConfig.DATABASE_NAME.key()) + .stringType() + .noDefaultValue() + .withDescription("Database name to register to Hive metastore"); + public static final ConfigOption TABLE_NAME = ConfigOptions .key(HoodieWriteConfig.TBL_NAME.key()) .stringType() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 5a42f79aff1c7..2e6c55660dbf5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -307,6 +307,7 @@ public static HoodieTableMetaClient initTableIfNotExists( .setTableCreateSchema(conf.getString(FlinkOptions.SOURCE_AVRO_SCHEMA)) .setTableType(conf.getString(FlinkOptions.TABLE_TYPE)) .setTableName(conf.getString(FlinkOptions.TABLE_NAME)) + .setDatabaseName(conf.getString(FlinkOptions.DATABASE_NAME)) .setRecordKeyFields(conf.getString(FlinkOptions.RECORD_KEY_FIELD, null)) .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS_NAME)) .setPreCombineField(OptionsResolver.getPreCombineField(conf)) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index 18ea636c05719..09d163a75858f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -112,7 +112,7 @@ class RunClusteringProcedure extends BaseProcedure logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.") val startTs = System.currentTimeMillis() - pendingClustering.foreach(client.cluster(_, true)) + pendingClustering.foreach(client.cluster) logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + s" time cost: ${System.currentTimeMillis() - startTs}ms.") diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index f6905f92d9440..3c97b732eb62a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -196,7 +196,7 @@ private int doCluster(JavaSparkContext jsc) throws Exception { throw new HoodieClusteringException("There is no scheduled clustering in the table."); } } - Option commitMetadata = client.cluster(cfg.clusteringInstantTime, true).getCommitMetadata(); + Option commitMetadata = client.cluster(cfg.clusteringInstantTime).getCommitMetadata(); return UtilHelpers.handleErrors(commitMetadata.get(), cfg.clusteringInstantTime); } @@ -252,7 +252,7 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { LOG.info("The schedule instant time is " + instantTime.get()); LOG.info("Step 2: Do cluster"); - Option metadata = client.cluster(instantTime.get(), true).getCommitMetadata(); + Option metadata = client.cluster(instantTime.get()).getCommitMetadata(); return UtilHelpers.handleErrors(metadata.get(), instantTime.get()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 3fbb8c0f8d48c..fbccd973256ff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -330,7 +330,7 @@ public Pair, JavaRDD> syncOnce() throws IOException if (cfg.retryLastPendingInlineClusteringJob && getHoodieClientConfig(this.schemaProvider).inlineClusteringEnabled()) { Option pendingClusteringInstant = getLastPendingClusteringInstant(allCommitsTimelineOpt); if (pendingClusteringInstant.isPresent()) { - writeClient.cluster(pendingClusteringInstant.get(), true); + writeClient.cluster(pendingClusteringInstant.get()); } }