From 80c9e23b6103453e4e7c48005da5c35ecc9413f7 Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 20 Nov 2023 16:09:21 -0500 Subject: [PATCH 1/8] WIP --- .../gobblin/runtime/SafeDatasetCommit.java | 4 +- .../runtime/TaskStateCollectorService.java | 87 +++++---- .../temporal/ddm/activity/CommitActivity.java | 32 +++ .../ddm/activity/impl/CommitActivityImpl.java | 183 ++++++++++++++++++ .../commit/LineageEventProduceWorkflow.java | 6 + .../launcher/ProcessWorkUnitsJobLauncher.java | 2 +- .../ddm/worker/WorkFulfillmentWorker.java | 6 +- .../ddm/workflow/CommitStepWorkflow.java | 36 ++++ .../workflow/impl/CommitStepWorkflowImpl.java | 40 ++++ ...tingExecOfProcessWorkUnitWorkflowImpl.java | 1 + .../impl/ProcessWorkUnitsWorkflowImpl.java | 18 +- 11 files changed, 368 insertions(+), 47 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index a2885c168a9..90676b56110 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -63,7 +63,7 @@ */ @RequiredArgsConstructor @Slf4j -final class SafeDatasetCommit implements Callable { +public final class SafeDatasetCommit implements Callable { private static final Object GLOBAL_LOCK = new Object(); @@ -316,12 +316,12 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) { // The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used + log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath()); datasetState.setState(JobState.RunningState.FAILED); datasetState.incrementJobFailures(); Optional taskStateException = taskState.getTaskFailureException(); log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. " + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.")); - return; } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 43164638573..87a82a34b39 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -18,6 +18,7 @@ package org.apache.gobblin.runtime; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,8 +30,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Predicate; @@ -68,8 +67,6 @@ @Slf4j public class TaskStateCollectorService extends AbstractScheduledService { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class); - private final JobState jobState; private final EventBus eventBus; @@ -166,13 +163,13 @@ protected Scheduler scheduler() { @Override protected void startUp() throws Exception { - LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); super.startUp(); } @Override protected void shutDown() throws Exception { - LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); try { runOneIteration(); } finally { @@ -193,39 +190,11 @@ protected void shutDown() throws Exception { * @throws IOException if it fails to collect the output {@link TaskState}s */ private void collectOutputTaskStates() throws IOException { - List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { - @Override - public boolean apply(String input) { - return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) - && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); - }}); - if (taskStateNames == null || taskStateNames.size() == 0) { - LOGGER.debug("No output task state files found in " + this.outputTaskStateDir); + final Queue taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, this.stateSerDeRunnerThreads); + if (taskStateQueue == null) { return; } - - final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); - try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) { - for (final String taskStateName : taskStateNames) { - LOGGER.debug("Found output task state file " + taskStateName); - // Deserialize the TaskState and delete the file - stateSerDeRunner.submitCallable(new Callable() { - @Override - public Void call() throws Exception { - TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); - taskStateQueue.add(taskState); - taskStateStore.delete(outputTaskStateDir.getName(), taskStateName); - return null; - } - }, "Deserialize state for " + taskStateName); - } - } catch (IOException ioe) { - LOGGER.warn("Could not read all task state files."); - } - - LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); - // Add the TaskStates of completed tasks to the JobState so when the control // returns to the launcher, it sees the TaskStates of all completed tasks. for (TaskState taskState : taskStateQueue) { @@ -241,7 +210,7 @@ public Void call() throws Exception { // Finish any additional steps defined in handler on driver level. // Currently implemented handler for Hive registration only. if (optionalTaskCollectorHandler.isPresent()) { - LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); + log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); try { optionalTaskCollectorHandler.get().handle(taskStateQueue); @@ -259,6 +228,42 @@ public Void call() throws Exception { this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); } + public static Queue deserializeTaskStatesFromFolder(StateStore taskStateStore, Path outputTaskStateDir, + int numDeserializerThreads) throws IOException { + List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { + @Override + public boolean apply(String input) { + return input != null + && input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) + && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); + }}); + + if (taskStateNames == null || taskStateNames.isEmpty()) { + log.info("No output task state files found in " + outputTaskStateDir); + return null; + } + + final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); + try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) { + for (final String taskStateName : taskStateNames) { + log.info("Found output task state file " + taskStateName); + // Deserialize the TaskState and delete the file + stateSerDeRunner.submitCallable(new Callable() { + @Override + public Void call() throws Exception { + TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); + taskStateQueue.add(taskState); + return null; + } + }, "Deserialize state for " + taskStateName); + } + } catch (IOException ioe) { + log.warn("Could not read all task state files."); + } + log.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); + return taskStateQueue; + } + /** * Uses the size of work units to determine a job's progress and reports the progress as a percentage via * GobblinTrackingEvents @@ -267,7 +272,7 @@ public Void call() throws Exception { private void reportJobProgress(TaskState taskState) { String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE); if (stringSize == null) { - LOGGER.warn("Expected to report job progress but work unit byte size property null"); + log.warn("Expected to report job progress but work unit byte size property null"); return; } @@ -275,7 +280,7 @@ private void reportJobProgress(TaskState taskState) { // If progress reporting is enabled, value should be present if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) { - LOGGER.warn("Expected to report job progress but total bytes to copy property null"); + log.warn("Expected to report job progress but total bytes to copy property null"); return; } this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE); @@ -287,7 +292,7 @@ private void reportJobProgress(TaskState taskState) { this.workUnitsCompletedSoFar += 1; if (this.totalNumWorkUnits == 0) { - LOGGER.warn("Expected to report job progress but work units are not countable"); + log.warn("Expected to report job progress but work units are not countable"); return; } newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits; @@ -307,7 +312,7 @@ private void reportJobProgress(TaskState taskState) { Map progress = new HashMap<>(); progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport)); - LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%"); + log.info("Sending copy progress event with percentage " + percentageToReport + "%"); new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java new file mode 100644 index 00000000000..d690fafcc15 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; + + +/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */ +@ActivityInterface +public interface CommitActivity { + @ActivityMethod + // CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`! + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java new file mode 100644 index 00000000000..dd9a605db1b --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.typesafe.config.ConfigFactory; +import io.temporal.failure.ApplicationFailure; +import java.io.IOException; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.commit.DeliverySemantics; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.FsStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.SafeDatasetCommit; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.TaskStateCollectorService; +import org.apache.gobblin.source.extractor.JobCommitPolicy; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.util.Either; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.SerializationUtils; +import org.apache.gobblin.util.executors.IteratorExecutor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +@Slf4j +public class CommitActivityImpl implements CommitActivity { + + @Override + public int commit(WUProcessingSpec workSpec) { + int numDeserializationThreads = 1; + try { + FileSystem fs = Help.loadFileSystem(workSpec); + JobState jobState = Help.loadJobState(workSpec, fs); + SharedResourcesBroker instanceBroker = createDefaultInstanceBroker(jobState.getProperties()); + JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null); + // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits) + Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent(); + Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName()); + log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri()); + StateStore taskStateStore = Help.openTaskStateStore(workSpec, fs); + Collection taskStateQueue = + ImmutableList.copyOf( + TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads)); + commitTaskStates(jobState, taskStateQueue, globalGobblinContext); + + } catch (Exception e) { + //TODO: IMPROVE GRANULARITY OF RETRIES + throw ApplicationFailure.newNonRetryableFailureWithCause( + "Failed to commit dataset state for some dataset(s) of job ", + IOException.class.toString(), + new IOException(e), + null + ); + } + + return 0; + } + + protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) throws IOException { + return HadoopUtils.getFileSystem(fsUri, stateConfig); + } + + void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { + Map datasetStatesByUrns = createDatasetStatesByUrns(taskStates); + final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState); + final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE; + final int numCommitThreads = 1; + + if (!shouldCommitDataInJob) { + log.info("Job will not commit data since data are committed by tasks."); + } + + try { + if (!datasetStatesByUrns.isEmpty()) { + log.info("Persisting dataset urns."); + } + + List> result = new IteratorExecutor<>(Iterables + .transform(datasetStatesByUrns.entrySet(), + new Function, Callable>() { + @Nullable + @Override + public Callable apply(final Map.Entry entry) { + return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(), + entry.getValue(), false, jobContext); + } + }).iterator(), numCommitThreads, + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("Commit-thread-%d"))) + .executeAndGetResults(); + + IteratorExecutor.logFailures(result, null, 10); + + if (!IteratorExecutor.verifyAllSuccessful(result)) { + // TODO: propagate cause of failure + throw new IOException("Failed to commit dataset state for some dataset(s) of job "); + } + } catch (InterruptedException exc) { + throw new IOException(exc); + } + } + + public Map createDatasetStatesByUrns(Collection taskStates) { + Map datasetStatesByUrns = Maps.newHashMap(); + + //TODO: handle skipped tasks? + for (TaskState taskState : taskStates) { + String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState); + datasetStatesByUrns.get(datasetUrn).incrementTaskCount(); + datasetStatesByUrns.get(datasetUrn).addTaskState(taskState); + } + + return datasetStatesByUrns; + } + + private String createDatasetUrn(Map datasetStatesByUrns, TaskState taskState) { + String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); + if (!datasetStatesByUrns.containsKey(datasetUrn)) { + JobState.DatasetState datasetState = new JobState.DatasetState(); + datasetState.setDatasetUrn(datasetUrn); + datasetStatesByUrns.put(datasetUrn, datasetState); + } + return datasetUrn; + } + + private static boolean shouldCommitDataInJob(State state) { + boolean jobCommitPolicyIsFull = + JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS; + boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, + ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL); + boolean jobDataPublisherSpecified = + !Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE)); + return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified; + } + + private static SharedResourcesBroker createDefaultInstanceBroker(Properties jobProps) { + log.warn("Creating a job specific {}. Objects will only be shared at the job level.", + SharedResourcesBroker.class.getSimpleName()); + return SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(jobProps), + GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + } + +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java new file mode 100644 index 00000000000..4904b356d2f --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java @@ -0,0 +1,6 @@ +package org.apache.gobblin.temporal.ddm.commit; + +public class LineageEventProduceWorkflow { + + +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index 95425a64371..88838ca6a88 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.ddm.launcher; +import io.temporal.client.WorkflowOptions; import java.net.URI; import java.util.List; import java.util.Properties; @@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; import com.typesafe.config.ConfigFactory; -import io.temporal.client.WorkflowOptions; import org.apache.hadoop.fs.Path; import org.apache.gobblin.metrics.Tag; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java index 9af6995b509..d425bbc4b15 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -24,7 +24,9 @@ import io.temporal.worker.WorkerOptions; import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl; import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; @@ -40,12 +42,12 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { @Override protected Class[] getWorkflowImplClasses() { - return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class }; + return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class }; } @Override protected Object[] getActivityImplInstances() { - return new Object[] { new ProcessWorkUnitImpl() }; + return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() }; } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java new file mode 100644 index 00000000000..4f509312cc3 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import io.temporal.workflow.Promise; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; + + +@WorkflowInterface +public interface CommitStepWorkflow { + + /** + * This is the method that is executed when the Workflow Execution is started. The Workflow + * Execution completes when this method finishes execution. + */ + @WorkflowMethod + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java new file mode 100644 index 00000000000..6049546dee4 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -0,0 +1,40 @@ +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.workflow.Async; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import java.time.Duration; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; + + +@Slf4j +public class CommitStepWorkflowImpl implements CommitStepWorkflow { + + private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(999)) + .setRetryOptions(ACTIVITY_RETRY_OPTS) + .build(); + + private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); + + @Override + public int commit(WUProcessingSpec workSpec) { + Promise result = Async.function(activityStub::commit, workSpec); + return result.get(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java index 074bb460972..eaba17bf388 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java @@ -25,6 +25,7 @@ import java.time.Duration; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index eafc624096e..abe7fa3ee20 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -23,12 +23,14 @@ import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.cluster.WorkerConfig; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; @@ -36,6 +38,7 @@ import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; +@Slf4j public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits"; @@ -43,10 +46,16 @@ public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public int process(WUProcessingSpec workSpec) { Workload workload = createWorkload(workSpec); NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec); - return processingWorkflow.performWorkload( + int workunitsProcessed = processingWorkflow.performWorkload( WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty() ); + if (workunitsProcessed > 0) { + CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(); + int result = commitWorkflow.commit(workSpec); + return result; + } + return workunitsProcessed; } protected Workload createWorkload(WUProcessingSpec workSpec) { @@ -61,4 +70,11 @@ protected NestingExecWorkflow createProcessingWorkflow(FileS // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + + protected CommitStepWorkflow createCommitStepWorkflow() { + ChildWorkflowOptions childOpts = + ChildWorkflowOptions.newBuilder().setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON).build(); + + return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts); + } } From a8df7227ce1cb07abfa9e3c996d629ece30ede3d Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 20 Nov 2023 19:50:19 -0500 Subject: [PATCH 2/8] fix checkstyle --- .../java/org/apache/gobblin/runtime/SafeDatasetCommit.java | 1 + .../apache/gobblin/runtime/TaskStateCollectorService.java | 1 - .../temporal/ddm/commit/LineageEventProduceWorkflow.java | 6 ------ 3 files changed, 1 insertion(+), 7 deletions(-) delete mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 90676b56110..f8e4f5ecba8 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -403,6 +403,7 @@ private void finalizeDatasetState(JobState.DatasetState datasetState, String dat log.warn("At least one task in {} did not get committed successfully. Setting dataset state to FAILED. {}", datasetUrn, taskStateException); } + return; } } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 87a82a34b39..49f4c29f93b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -18,7 +18,6 @@ package org.apache.gobblin.runtime; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java deleted file mode 100644 index 4904b356d2f..00000000000 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/commit/LineageEventProduceWorkflow.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.apache.gobblin.temporal.ddm.commit; - -public class LineageEventProduceWorkflow { - - -} From b17d5f1b0906813506a3bd957b21bfb87d725336 Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 20 Nov 2023 19:58:22 -0500 Subject: [PATCH 3/8] Fix tests --- .../main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index f8e4f5ecba8..673b11ab2a4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -322,6 +322,7 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState Optional taskStateException = taskState.getTaskFailureException(); log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. " + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.")); + return; } } @@ -403,7 +404,6 @@ private void finalizeDatasetState(JobState.DatasetState datasetState, String dat log.warn("At least one task in {} did not get committed successfully. Setting dataset state to FAILED. {}", datasetUrn, taskStateException); } - return; } } } From aa5ce445514ac573b7b268c44fdfc8c1d0f85ef1 Mon Sep 17 00:00:00 2001 From: William Lo Date: Tue, 21 Nov 2023 21:13:20 -0500 Subject: [PATCH 4/8] Address review first pass --- .../apache/gobblin/runtime/JobContext.java | 2 +- .../gobblin/runtime/SafeDatasetCommit.java | 5 +- .../runtime/TaskStateCollectorService.java | 18 ++++-- .../temporal/ddm/activity/CommitActivity.java | 10 +++- .../ddm/activity/impl/CommitActivityImpl.java | 55 +++++++++---------- .../ddm/workflow/CommitStepWorkflow.java | 9 +-- .../workflow/impl/CommitStepWorkflowImpl.java | 23 ++++++-- ...tingExecOfProcessWorkUnitWorkflowImpl.java | 1 - .../impl/ProcessWorkUnitsWorkflowImpl.java | 7 ++- 9 files changed, 78 insertions(+), 52 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 89d1dc41b14..c2bdc592d1a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -543,7 +543,7 @@ public static Optional> getJobDataPublisherClass( * Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full", * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true. */ - private static boolean shouldCommitDataInJob(State state) { + public static boolean shouldCommitDataInJob(State state) { boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS; boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 673b11ab2a4..e07083ff1d3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -316,12 +316,11 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState if (taskState.getWorkingState() != WorkUnitState.WorkingState.SUCCESSFUL && this.jobContext.getJobCommitPolicy() == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) { // The dataset state is set to FAILED if any task failed and COMMIT_ON_FULL_SUCCESS is used - log.info("Failed task state for " + taskState.getWorkunit().getOutputFilePath()); datasetState.setState(JobState.RunningState.FAILED); datasetState.incrementJobFailures(); Optional taskStateException = taskState.getTaskFailureException(); - log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. " - + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.")); + log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" , + taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set."); return; } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 49f4c29f93b..baf6d95e37d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -227,6 +227,15 @@ private void collectOutputTaskStates() throws IOException { this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); } + /** + * Reads in a {@link FsStateStore} folder used to store Task state outputs, and returns a queue of {@link TaskState}s + * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers) + * @param taskStateStore + * @param outputTaskStateDir + * @param numDeserializerThreads + * @return Queue of TaskStates + * @throws IOException + */ public static Queue deserializeTaskStatesFromFolder(StateStore taskStateStore, Path outputTaskStateDir, int numDeserializerThreads) throws IOException { List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { @@ -238,28 +247,29 @@ public boolean apply(String input) { }}); if (taskStateNames == null || taskStateNames.isEmpty()) { - log.info("No output task state files found in " + outputTaskStateDir); + log.warn("No output task state files found in " + outputTaskStateDir); return null; } final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) { for (final String taskStateName : taskStateNames) { - log.info("Found output task state file " + taskStateName); + log.debug("Found output task state file " + taskStateName); // Deserialize the TaskState and delete the file stateSerDeRunner.submitCallable(new Callable() { @Override public Void call() throws Exception { TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); taskStateQueue.add(taskState); + taskStateStore.delete(outputTaskStateDir.getName(), taskStateName); return null; } }, "Deserialize state for " + taskStateName); } } catch (IOException ioe) { - log.warn("Could not read all task state files."); + log.error("Could not read all task state files due to", ioe); } - log.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); + log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), outputTaskStateDir)); return taskStateQueue; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java index d690fafcc15..1f29a7abbe7 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java @@ -20,13 +20,17 @@ import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; -import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; -/** Activity for processing/executing a {@link org.apache.gobblin.source.workunit.WorkUnit}, provided by claim-check */ +/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by + * reading in a {@link WUProcessingSpec} to determine the location of the output task states */ @ActivityInterface public interface CommitActivity { + /** + * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + * @param workSpec + * @return number of workunits committed + */ @ActivityMethod - // CAUTION: void return type won't work, as apparently it mayn't be the return type for `io.temporal.workflow.Functions.Func1`! int commit(WUProcessingSpec workSpec); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index dd9a605db1b..cc6089be161 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -41,7 +41,6 @@ import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.StateStore; import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobState; @@ -52,12 +51,10 @@ import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; -import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.HadoopUtils; -import org.apache.gobblin.util.SerializationUtils; import org.apache.gobblin.util.executors.IteratorExecutor; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -66,13 +63,16 @@ @Slf4j public class CommitActivityImpl implements CommitActivity { + static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10; + static int DEFAULT_NUM_COMMIT_THREADS = 1; @Override public int commit(WUProcessingSpec workSpec) { - int numDeserializationThreads = 1; + // TODO: Make this configurable + int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; try { FileSystem fs = Help.loadFileSystem(workSpec); JobState jobState = Help.loadJobState(workSpec, fs); - SharedResourcesBroker instanceBroker = createDefaultInstanceBroker(jobState.getProperties()); + SharedResourcesBroker instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState); JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null); // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits) Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent(); @@ -83,7 +83,7 @@ public int commit(WUProcessingSpec workSpec) { ImmutableList.copyOf( TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads)); commitTaskStates(jobState, taskStateQueue, globalGobblinContext); - + return taskStateQueue.size(); } catch (Exception e) { //TODO: IMPROVE GRANULARITY OF RETRIES throw ApplicationFailure.newNonRetryableFailureWithCause( @@ -93,27 +93,28 @@ public int commit(WUProcessingSpec workSpec) { null ); } - - return 0; - } - - protected FileSystem loadFileSystemForUri(URI fsUri, State stateConfig) throws IOException { - return HadoopUtils.getFileSystem(fsUri, stateConfig); } + /** + * Commit task states to the dataset state store. + * @param jobState + * @param taskStates + * @param jobContext + * @throws IOException + */ void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { Map datasetStatesByUrns = createDatasetStatesByUrns(taskStates); - final boolean shouldCommitDataInJob = shouldCommitDataInJob(jobState); + final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState); final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE; - final int numCommitThreads = 1; - + //TODO: Make this configurable + final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS; if (!shouldCommitDataInJob) { log.info("Job will not commit data since data are committed by tasks."); } try { if (!datasetStatesByUrns.isEmpty()) { - log.info("Persisting dataset urns."); + log.info("Persisting {} dataset urns.", datasetStatesByUrns.size()); } List> result = new IteratorExecutor<>(Iterables @@ -133,14 +134,20 @@ public Callable apply(final Map.Entry entry if (!IteratorExecutor.verifyAllSuccessful(result)) { // TODO: propagate cause of failure - throw new IOException("Failed to commit dataset state for some dataset(s) of job "); + String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, ""); + throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName); } } catch (InterruptedException exc) { throw new IOException(exc); } } - public Map createDatasetStatesByUrns(Collection taskStates) { + /** + * Organize task states by dataset urns. + * @param taskStates + * @return + */ + public static Map createDatasetStatesByUrns(Collection taskStates) { Map datasetStatesByUrns = Maps.newHashMap(); //TODO: handle skipped tasks? @@ -153,7 +160,7 @@ public Map createDatasetStatesByUrns(Collection datasetStatesByUrns, TaskState taskState) { + private static String createDatasetUrn(Map datasetStatesByUrns, TaskState taskState) { String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); if (!datasetStatesByUrns.containsKey(datasetUrn)) { JobState.DatasetState datasetState = new JobState.DatasetState(); @@ -163,16 +170,6 @@ private String createDatasetUrn(Map datasetStates return datasetUrn; } - private static boolean shouldCommitDataInJob(State state) { - boolean jobCommitPolicyIsFull = - JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS; - boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, - ConfigurationKeys.DEFAULT_PUBLISH_DATA_AT_JOB_LEVEL); - boolean jobDataPublisherSpecified = - !Strings.isNullOrEmpty(state.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE)); - return jobCommitPolicyIsFull || publishDataAtJobLevel || jobDataPublisherSpecified; - } - private static SharedResourcesBroker createDefaultInstanceBroker(Properties jobProps) { log.warn("Creating a job specific {}. Objects will only be shared at the job level.", SharedResourcesBroker.class.getSimpleName()); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java index 4f509312cc3..25a8c5eb453 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -17,19 +17,20 @@ package org.apache.gobblin.temporal.ddm.workflow; -import io.temporal.workflow.Promise; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; -import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; +/** + * Workflow for committing the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + */ @WorkflowInterface public interface CommitStepWorkflow { /** - * This is the method that is executed when the Workflow Execution is started. The Workflow - * Execution completes when this method finishes execution. + * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + * Returns the number of workunits committed */ @WorkflowMethod int commit(WUProcessingSpec workSpec); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java index 6049546dee4..2b674ec1979 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -1,9 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gobblin.temporal.ddm.workflow.impl; import io.temporal.activity.ActivityOptions; import io.temporal.common.RetryOptions; -import io.temporal.workflow.Async; -import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; import java.time.Duration; @@ -11,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; -import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; @@ -34,7 +48,6 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow { @Override public int commit(WUProcessingSpec workSpec) { - Promise result = Async.function(activityStub::commit, workSpec); - return result.get(); + return activityStub.commit(workSpec); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java index eaba17bf388..074bb460972 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/NestingExecOfProcessWorkUnitWorkflowImpl.java @@ -25,7 +25,6 @@ import java.time.Duration; import org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit; -import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.util.nesting.workflow.AbstractNestingExecWorkflowImpl; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index abe7fa3ee20..485602539ec 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -41,6 +41,7 @@ @Slf4j public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits"; + public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow"; @Override public int process(WUProcessingSpec workSpec) { @@ -72,8 +73,10 @@ protected NestingExecWorkflow createProcessingWorkflow(FileS } protected CommitStepWorkflow createCommitStepWorkflow() { - ChildWorkflowOptions childOpts = - ChildWorkflowOptions.newBuilder().setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON).build(); + ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) + .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, WorkerConfig.of(this).orElse(ConfigFactory.empty()))) + .build(); return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts); } From d3161da0e055007a15d30e6d3fe19b788f7f1c1c Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 22 Nov 2023 13:31:56 -0500 Subject: [PATCH 5/8] Fix checkstyle --- .../temporal/ddm/activity/impl/CommitActivityImpl.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index cc6089be161..18666700321 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -19,14 +19,12 @@ import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.typesafe.config.ConfigFactory; import io.temporal.failure.ApplicationFailure; import java.io.IOException; -import java.net.URI; import java.util.Collection; import java.util.List; import java.util.Map; @@ -47,14 +45,12 @@ import org.apache.gobblin.runtime.SafeDatasetCommit; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.TaskStateCollectorService; -import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.temporal.ddm.activity.CommitActivity; import org.apache.gobblin.temporal.ddm.util.JobStateUtils; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; -import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.executors.IteratorExecutor; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; From b89811e7fc594765163cc97c59aea3d86e2642b2 Mon Sep 17 00:00:00 2001 From: William Lo Date: Sun, 10 Dec 2023 00:55:55 -0500 Subject: [PATCH 6/8] Address review --- .../runtime/TaskStateCollectorService.java | 34 ++++++++--------- .../ddm/activity/impl/CommitActivityImpl.java | 37 +++++++++---------- .../ddm/workflow/CommitStepWorkflow.java | 2 +- .../impl/ProcessWorkUnitsWorkflowImpl.java | 7 +++- 4 files changed, 41 insertions(+), 39 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index baf6d95e37d..b7344bdaa14 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Queue; import java.util.concurrent.Callable; @@ -30,7 +31,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; @@ -141,7 +141,7 @@ public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBu throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe); } } else { - optionalTaskCollectorHandler = Optional.absent(); + optionalTaskCollectorHandler = Optional.empty(); } isJobProceedOnCollectorServiceFailure = @@ -190,13 +190,13 @@ protected void shutDown() throws Exception { */ private void collectOutputTaskStates() throws IOException { - final Queue taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir, this.stateSerDeRunnerThreads); - if (taskStateQueue == null) { + final Optional> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads); + if (!taskStateQueue.isPresent()) { return; } // Add the TaskStates of completed tasks to the JobState so when the control // returns to the launcher, it sees the TaskStates of all completed tasks. - for (TaskState taskState : taskStateQueue) { + for (TaskState taskState : taskStateQueue.get()) { consumeTaskIssues(taskState); taskState.setJobState(this.jobState); this.jobState.addTaskState(taskState); @@ -212,11 +212,11 @@ private void collectOutputTaskStates() throws IOException { log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); try { - optionalTaskCollectorHandler.get().handle(taskStateQueue); + optionalTaskCollectorHandler.get().handle(taskStateQueue.get()); } catch (Throwable t) { if (isJobProceedOnCollectorServiceFailure) { log.error("Failed to commit dataset while job proceeds", t); - SafeDatasetCommit.setTaskFailureException(taskStateQueue, t); + SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t); } else { throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t); } @@ -224,21 +224,21 @@ private void collectOutputTaskStates() throws IOException { } // Notify the listeners for the completion of the tasks - this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); + this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get()))); } /** * Reads in a {@link FsStateStore} folder used to store Task state outputs, and returns a queue of {@link TaskState}s * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers) * @param taskStateStore - * @param outputTaskStateDir + * @param taskStateTableName * @param numDeserializerThreads - * @return Queue of TaskStates + * @return Queue of TaskStates, optional if no task states are found in the provided state store * @throws IOException */ - public static Queue deserializeTaskStatesFromFolder(StateStore taskStateStore, Path outputTaskStateDir, + public static Optional> deserializeTaskStatesFromFolder(StateStore taskStateStore, String taskStateTableName, int numDeserializerThreads) throws IOException { - List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { + List taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate() { @Override public boolean apply(String input) { return input != null @@ -247,8 +247,8 @@ public boolean apply(String input) { }}); if (taskStateNames == null || taskStateNames.isEmpty()) { - log.warn("No output task state files found in " + outputTaskStateDir); - return null; + log.warn("No output task state files found in " + taskStateTableName); + return Optional.empty(); } final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); @@ -259,9 +259,9 @@ public boolean apply(String input) { stateSerDeRunner.submitCallable(new Callable() { @Override public Void call() throws Exception { - TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); + TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0); taskStateQueue.add(taskState); - taskStateStore.delete(outputTaskStateDir.getName(), taskStateName); + taskStateStore.delete(taskStateTableName, taskStateName); return null; } }, "Deserialize state for " + taskStateName); @@ -270,7 +270,7 @@ public Void call() throws Exception { log.error("Could not read all task state files due to", ioe); } log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), outputTaskStateDir)); - return taskStateQueue; + return Optional.of(taskStateQueue); } /** diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 18666700321..bd2eca31ad9 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -18,22 +18,21 @@ package org.apache.gobblin.temporal.ddm.activity.impl; import com.google.common.base.Function; -import com.google.common.base.Optional; +import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.typesafe.config.ConfigFactory; import io.temporal.failure.ApplicationFailure; import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Properties; +import java.util.Optional; +import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; -import org.apache.gobblin.broker.SharedResourcesBrokerFactory; import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.commit.DeliverySemantics; @@ -75,11 +74,14 @@ public int commit(WUProcessingSpec workSpec) { Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName()); log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri()); StateStore taskStateStore = Help.openTaskStateStore(workSpec, fs); - Collection taskStateQueue = - ImmutableList.copyOf( - TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath, numDeserializationThreads)); - commitTaskStates(jobState, taskStateQueue, globalGobblinContext); - return taskStateQueue.size(); + Optional> taskStateQueue = + TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads); + if (!taskStateQueue.isPresent()) { + log.error("No task states found at " + jobOutputPath); + return 0; + } + commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue.get()), globalGobblinContext); + return taskStateQueue.get().size(); } catch (Exception e) { //TODO: IMPROVE GRANULARITY OF RETRIES throw ApplicationFailure.newNonRetryableFailureWithCause( @@ -98,13 +100,16 @@ public int commit(WUProcessingSpec workSpec) { * @param jobContext * @throws IOException */ - void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { + private void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { Map datasetStatesByUrns = createDatasetStatesByUrns(taskStates); final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState); final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE; //TODO: Make this configurable final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS; if (!shouldCommitDataInJob) { + if (Strings.isNullOrEmpty(jobState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE))) { + log.warn("No data publisher is configured for this job. This can lead to non-atomic commit behavior."); + } log.info("Job will not commit data since data are committed by tasks."); } @@ -129,7 +134,7 @@ public Callable apply(final Map.Entry entry IteratorExecutor.logFailures(result, null, 10); if (!IteratorExecutor.verifyAllSuccessful(result)) { - // TODO: propagate cause of failure + // TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, ""); throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName); } @@ -141,7 +146,7 @@ public Callable apply(final Map.Entry entry /** * Organize task states by dataset urns. * @param taskStates - * @return + * @return A map of dataset urns to dataset task states. */ public static Map createDatasetStatesByUrns(Collection taskStates) { Map datasetStatesByUrns = Maps.newHashMap(); @@ -165,12 +170,4 @@ private static String createDatasetUrn(Map datase } return datasetUrn; } - - private static SharedResourcesBroker createDefaultInstanceBroker(Properties jobProps) { - log.warn("Creating a job specific {}. Objects will only be shared at the job level.", - SharedResourcesBroker.class.getSimpleName()); - return SharedResourcesBrokerFactory.createDefaultTopLevelBroker(ConfigFactory.parseProperties(jobProps), - GobblinScopeTypes.GLOBAL.defaultScopeInstance()); - } - } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java index 25a8c5eb453..f6f497027c6 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -30,7 +30,7 @@ public interface CommitStepWorkflow { /** * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} - * Returns the number of workunits committed + * @return number of workunits committed */ @WorkflowMethod int commit(WUProcessingSpec workSpec); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 485602539ec..34647397d29 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -54,9 +54,14 @@ public int process(WUProcessingSpec workSpec) { if (workunitsProcessed > 0) { CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(); int result = commitWorkflow.commit(workSpec); + if (result == 0) { + log.warn("No work units committed at the job level. They could be committed at a task level."); + } return result; + } else { + log.error("No workunits processed, so no commit will be attempted."); + return 0; } - return workunitsProcessed; } protected Workload createWorkload(WUProcessingSpec workSpec) { From 455826e40761dfa363762ca7bb29dd752a678d5a Mon Sep 17 00:00:00 2001 From: William Lo Date: Mon, 11 Dec 2023 12:19:54 -0500 Subject: [PATCH 7/8] Fix optional --- .../apache/gobblin/runtime/TaskStateCollectorService.java | 6 +++--- .../temporal/ddm/activity/impl/CommitActivityImpl.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index b7344bdaa14..2027d7433cb 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -209,7 +209,7 @@ private void collectOutputTaskStates() throws IOException { // Finish any additional steps defined in handler on driver level. // Currently implemented handler for Hive registration only. if (optionalTaskCollectorHandler.isPresent()) { - log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); + log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks"); try { optionalTaskCollectorHandler.get().handle(taskStateQueue.get()); @@ -228,7 +228,7 @@ private void collectOutputTaskStates() throws IOException { } /** - * Reads in a {@link FsStateStore} folder used to store Task state outputs, and returns a queue of {@link TaskState}s + * Reads in a @{@link StateStore} and deserializes all task states found in the provided table name * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers) * @param taskStateStore * @param taskStateTableName @@ -269,7 +269,7 @@ public Void call() throws Exception { } catch (IOException ioe) { log.error("Could not read all task state files due to", ioe); } - log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), outputTaskStateDir)); + log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName)); return Optional.of(taskStateQueue); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index bd2eca31ad9..271609ad3db 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -128,7 +128,8 @@ public Callable apply(final Map.Entry entry entry.getValue(), false, jobContext); } }).iterator(), numCommitThreads, - ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("Commit-thread-%d"))) + // TODO: Rewrite executorUtils to use java util optional + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d"))) .executeAndGetResults(); IteratorExecutor.logFailures(result, null, 10); From a45ce09fe612ad895244e4f56d0b00b0da5ccb1e Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 15 Dec 2023 18:29:47 -0500 Subject: [PATCH 8/8] Final cleanup, optional cleanup --- .../gobblin/runtime/TaskStateCollectorService.java | 2 +- .../temporal/ddm/activity/impl/CommitActivityImpl.java | 9 +++++---- .../ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 2027d7433cb..b7eb436fa0e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -228,7 +228,7 @@ private void collectOutputTaskStates() throws IOException { } /** - * Reads in a @{@link StateStore} and deserializes all task states found in the provided table name + * Reads in a {@link StateStore} and deserializes all task states found in the provided table name * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers) * @param taskStateStore * @param taskStateTableName diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java index 271609ad3db..8874bccd7dc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -74,14 +74,15 @@ public int commit(WUProcessingSpec workSpec) { Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName()); log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri()); StateStore taskStateStore = Help.openTaskStateStore(workSpec, fs); - Optional> taskStateQueue = + Optional> taskStateQueueOpt = TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads); - if (!taskStateQueue.isPresent()) { + if (!taskStateQueueOpt.isPresent()) { log.error("No task states found at " + jobOutputPath); return 0; } - commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue.get()), globalGobblinContext); - return taskStateQueue.get().size(); + Queue taskStateQueue = taskStateQueueOpt.get(); + commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext); + return taskStateQueue.size(); } catch (Exception e) { //TODO: IMPROVE GRANULARITY OF RETRIES throw ApplicationFailure.newNonRetryableFailureWithCause( diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index 34647397d29..141f220579e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -59,7 +59,7 @@ public int process(WUProcessingSpec workSpec) { } return result; } else { - log.error("No workunits processed, so no commit will be attempted."); + log.error("No work units processed, so no commit attempted."); return 0; } }