diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 89d1dc41b14..c2bdc592d1a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -543,7 +543,7 @@ public static Optional> getJobDataPublisherClass( * Data should be committed by the job if either {@link ConfigurationKeys#JOB_COMMIT_POLICY_KEY} is set to "full", * or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true. */ - private static boolean shouldCommitDataInJob(State state) { + public static boolean shouldCommitDataInJob(State state) { boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state.getProperties()) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS; boolean publishDataAtJobLevel = state.getPropAsBoolean(ConfigurationKeys.PUBLISH_DATA_AT_JOB_LEVEL, diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index a2885c168a9..e07083ff1d3 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -63,7 +63,7 @@ */ @RequiredArgsConstructor @Slf4j -final class SafeDatasetCommit implements Callable { +public final class SafeDatasetCommit implements Callable { private static final Object GLOBAL_LOCK = new Object(); @@ -319,8 +319,8 @@ private void finalizeDatasetStateBeforeCommit(JobState.DatasetState datasetState datasetState.setState(JobState.RunningState.FAILED); datasetState.incrementJobFailures(); Optional taskStateException = taskState.getTaskFailureException(); - log.warn("At least one task did not get committed successfully. Setting dataset state to FAILED. " - + (taskStateException.isPresent() ? taskStateException.get() : "Exception not set.")); + log.warn("Failed task state for {} At least one task did not get committed successfully. Setting dataset state to FAILED. {}" , + taskState.getWorkunit().getOutputFilePath(), taskStateException.isPresent() ? taskStateException.get() : "Exception not set."); return; } } diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java index 43164638573..b7eb436fa0e 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Queue; import java.util.concurrent.Callable; @@ -29,10 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; @@ -68,8 +66,6 @@ @Slf4j public class TaskStateCollectorService extends AbstractScheduledService { - private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateCollectorService.class); - private final JobState jobState; private final EventBus eventBus; @@ -145,7 +141,7 @@ public TaskStateCollectorService(Properties jobProps, JobState jobState, EventBu throw new RuntimeException("Could not construct TaskCollectorHandler " + handlerTypeName, rfe); } } else { - optionalTaskCollectorHandler = Optional.absent(); + optionalTaskCollectorHandler = Optional.empty(); } isJobProceedOnCollectorServiceFailure = @@ -166,13 +162,13 @@ protected Scheduler scheduler() { @Override protected void startUp() throws Exception { - LOGGER.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Starting the " + TaskStateCollectorService.class.getSimpleName()); super.startUp(); } @Override protected void shutDown() throws Exception { - LOGGER.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); + log.info("Stopping the " + TaskStateCollectorService.class.getSimpleName()); try { runOneIteration(); } finally { @@ -193,42 +189,14 @@ protected void shutDown() throws Exception { * @throws IOException if it fails to collect the output {@link TaskState}s */ private void collectOutputTaskStates() throws IOException { - List taskStateNames = taskStateStore.getTableNames(outputTaskStateDir.getName(), new Predicate() { - @Override - public boolean apply(String input) { - return input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) - && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); - }}); - if (taskStateNames == null || taskStateNames.size() == 0) { - LOGGER.debug("No output task state files found in " + this.outputTaskStateDir); + final Optional> taskStateQueue = deserializeTaskStatesFromFolder(taskStateStore, outputTaskStateDir.getName(), this.stateSerDeRunnerThreads); + if (!taskStateQueue.isPresent()) { return; } - - final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); - try (ParallelRunner stateSerDeRunner = new ParallelRunner(this.stateSerDeRunnerThreads, null)) { - for (final String taskStateName : taskStateNames) { - LOGGER.debug("Found output task state file " + taskStateName); - // Deserialize the TaskState and delete the file - stateSerDeRunner.submitCallable(new Callable() { - @Override - public Void call() throws Exception { - TaskState taskState = taskStateStore.getAll(outputTaskStateDir.getName(), taskStateName).get(0); - taskStateQueue.add(taskState); - taskStateStore.delete(outputTaskStateDir.getName(), taskStateName); - return null; - } - }, "Deserialize state for " + taskStateName); - } - } catch (IOException ioe) { - LOGGER.warn("Could not read all task state files."); - } - - LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size())); - // Add the TaskStates of completed tasks to the JobState so when the control // returns to the launcher, it sees the TaskStates of all completed tasks. - for (TaskState taskState : taskStateQueue) { + for (TaskState taskState : taskStateQueue.get()) { consumeTaskIssues(taskState); taskState.setJobState(this.jobState); this.jobState.addTaskState(taskState); @@ -241,14 +209,14 @@ public Void call() throws Exception { // Finish any additional steps defined in handler on driver level. // Currently implemented handler for Hive registration only. if (optionalTaskCollectorHandler.isPresent()) { - LOGGER.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.size() + " tasks"); + log.info("Execute Pipelined TaskStateCollectorService Handler for " + taskStateQueue.get().size() + " tasks"); try { - optionalTaskCollectorHandler.get().handle(taskStateQueue); + optionalTaskCollectorHandler.get().handle(taskStateQueue.get()); } catch (Throwable t) { if (isJobProceedOnCollectorServiceFailure) { log.error("Failed to commit dataset while job proceeds", t); - SafeDatasetCommit.setTaskFailureException(taskStateQueue, t); + SafeDatasetCommit.setTaskFailureException(taskStateQueue.get(), t); } else { throw new RuntimeException("Hive Registration as the TaskStateCollectorServiceHandler failed.", t); } @@ -256,7 +224,53 @@ public Void call() throws Exception { } // Notify the listeners for the completion of the tasks - this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue))); + this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue.get()))); + } + + /** + * Reads in a {@link StateStore} and deserializes all task states found in the provided table name + * Task State files are populated by the {@link GobblinMultiTaskAttempt} to record the output of remote concurrent tasks (e.g. MR mappers) + * @param taskStateStore + * @param taskStateTableName + * @param numDeserializerThreads + * @return Queue of TaskStates, optional if no task states are found in the provided state store + * @throws IOException + */ + public static Optional> deserializeTaskStatesFromFolder(StateStore taskStateStore, String taskStateTableName, + int numDeserializerThreads) throws IOException { + List taskStateNames = taskStateStore.getTableNames(taskStateTableName, new Predicate() { + @Override + public boolean apply(String input) { + return input != null + && input.endsWith(AbstractJobLauncher.TASK_STATE_STORE_TABLE_SUFFIX) + && !input.startsWith(FsStateStore.TMP_FILE_PREFIX); + }}); + + if (taskStateNames == null || taskStateNames.isEmpty()) { + log.warn("No output task state files found in " + taskStateTableName); + return Optional.empty(); + } + + final Queue taskStateQueue = Queues.newConcurrentLinkedQueue(); + try (ParallelRunner stateSerDeRunner = new ParallelRunner(numDeserializerThreads, null)) { + for (final String taskStateName : taskStateNames) { + log.debug("Found output task state file " + taskStateName); + // Deserialize the TaskState and delete the file + stateSerDeRunner.submitCallable(new Callable() { + @Override + public Void call() throws Exception { + TaskState taskState = taskStateStore.getAll(taskStateTableName, taskStateName).get(0); + taskStateQueue.add(taskState); + taskStateStore.delete(taskStateTableName, taskStateName); + return null; + } + }, "Deserialize state for " + taskStateName); + } + } catch (IOException ioe) { + log.error("Could not read all task state files due to", ioe); + } + log.info(String.format("Collected task state of %d completed tasks in %s", taskStateQueue.size(), taskStateTableName)); + return Optional.of(taskStateQueue); } /** @@ -267,7 +281,7 @@ public Void call() throws Exception { private void reportJobProgress(TaskState taskState) { String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_SIZE); if (stringSize == null) { - LOGGER.warn("Expected to report job progress but work unit byte size property null"); + log.warn("Expected to report job progress but work unit byte size property null"); return; } @@ -275,7 +289,7 @@ private void reportJobProgress(TaskState taskState) { // If progress reporting is enabled, value should be present if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) { - LOGGER.warn("Expected to report job progress but total bytes to copy property null"); + log.warn("Expected to report job progress but total bytes to copy property null"); return; } this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE); @@ -287,7 +301,7 @@ private void reportJobProgress(TaskState taskState) { this.workUnitsCompletedSoFar += 1; if (this.totalNumWorkUnits == 0) { - LOGGER.warn("Expected to report job progress but work units are not countable"); + log.warn("Expected to report job progress but work units are not countable"); return; } newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits; @@ -307,7 +321,7 @@ private void reportJobProgress(TaskState taskState) { Map progress = new HashMap<>(); progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport)); - LOGGER.info("Sending copy progress event with percentage " + percentageToReport + "%"); + log.info("Sending copy progress event with percentage " + percentageToReport + "%"); new TimingEvent(this.eventSubmitter, TimingEvent.JOB_COMPLETION_PERCENTAGE).stop(progress); } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java new file mode 100644 index 00000000000..1f29a7abbe7 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; + + +/** Activity for reading the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} by + * reading in a {@link WUProcessingSpec} to determine the location of the output task states */ +@ActivityInterface +public interface CommitActivity { + /** + * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + * @param workSpec + * @return number of workunits committed + */ + @ActivityMethod + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java new file mode 100644 index 00000000000..8874bccd7dc --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import com.google.common.base.Function; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import io.temporal.failure.ApplicationFailure; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.commit.DeliverySemantics; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.SafeDatasetCommit; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.TaskStateCollectorService; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.util.JobStateUtils; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.work.assistance.Help; +import org.apache.gobblin.util.Either; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.executors.IteratorExecutor; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + + +@Slf4j +public class CommitActivityImpl implements CommitActivity { + + static int DEFAULT_NUM_DESERIALIZATION_THREADS = 10; + static int DEFAULT_NUM_COMMIT_THREADS = 1; + @Override + public int commit(WUProcessingSpec workSpec) { + // TODO: Make this configurable + int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS; + try { + FileSystem fs = Help.loadFileSystem(workSpec); + JobState jobState = Help.loadJobState(workSpec, fs); + SharedResourcesBroker instanceBroker = JobStateUtils.getSharedResourcesBroker(jobState); + JobContext globalGobblinContext = new JobContext(jobState.getProperties(), log, instanceBroker, null); + // TODO: Task state dir is a stub with the assumption it is always colocated with the workunits dir (as in the case of MR which generates workunits) + Path jobIdParent = new Path(workSpec.getWorkUnitsDir()).getParent(); + Path jobOutputPath = new Path(new Path(jobIdParent, "output"), jobIdParent.getName()); + log.info("Output path at: " + jobOutputPath + " with fs at " + fs.getUri()); + StateStore taskStateStore = Help.openTaskStateStore(workSpec, fs); + Optional> taskStateQueueOpt = + TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore, jobOutputPath.getName(), numDeserializationThreads); + if (!taskStateQueueOpt.isPresent()) { + log.error("No task states found at " + jobOutputPath); + return 0; + } + Queue taskStateQueue = taskStateQueueOpt.get(); + commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), globalGobblinContext); + return taskStateQueue.size(); + } catch (Exception e) { + //TODO: IMPROVE GRANULARITY OF RETRIES + throw ApplicationFailure.newNonRetryableFailureWithCause( + "Failed to commit dataset state for some dataset(s) of job ", + IOException.class.toString(), + new IOException(e), + null + ); + } + } + + /** + * Commit task states to the dataset state store. + * @param jobState + * @param taskStates + * @param jobContext + * @throws IOException + */ + private void commitTaskStates(State jobState, Collection taskStates, JobContext jobContext) throws IOException { + Map datasetStatesByUrns = createDatasetStatesByUrns(taskStates); + final boolean shouldCommitDataInJob = JobContext.shouldCommitDataInJob(jobState); + final DeliverySemantics deliverySemantics = DeliverySemantics.AT_LEAST_ONCE; + //TODO: Make this configurable + final int numCommitThreads = DEFAULT_NUM_COMMIT_THREADS; + if (!shouldCommitDataInJob) { + if (Strings.isNullOrEmpty(jobState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE))) { + log.warn("No data publisher is configured for this job. This can lead to non-atomic commit behavior."); + } + log.info("Job will not commit data since data are committed by tasks."); + } + + try { + if (!datasetStatesByUrns.isEmpty()) { + log.info("Persisting {} dataset urns.", datasetStatesByUrns.size()); + } + + List> result = new IteratorExecutor<>(Iterables + .transform(datasetStatesByUrns.entrySet(), + new Function, Callable>() { + @Nullable + @Override + public Callable apply(final Map.Entry entry) { + return new SafeDatasetCommit(shouldCommitDataInJob, false, deliverySemantics, entry.getKey(), + entry.getValue(), false, jobContext); + } + }).iterator(), numCommitThreads, + // TODO: Rewrite executorUtils to use java util optional + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), com.google.common.base.Optional.of("Commit-thread-%d"))) + .executeAndGetResults(); + + IteratorExecutor.logFailures(result, null, 10); + + if (!IteratorExecutor.verifyAllSuccessful(result)) { + // TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception + String jobName = jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY, ""); + throw new IOException("Failed to commit dataset state for some dataset(s) of job " + jobName); + } + } catch (InterruptedException exc) { + throw new IOException(exc); + } + } + + /** + * Organize task states by dataset urns. + * @param taskStates + * @return A map of dataset urns to dataset task states. + */ + public static Map createDatasetStatesByUrns(Collection taskStates) { + Map datasetStatesByUrns = Maps.newHashMap(); + + //TODO: handle skipped tasks? + for (TaskState taskState : taskStates) { + String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState); + datasetStatesByUrns.get(datasetUrn).incrementTaskCount(); + datasetStatesByUrns.get(datasetUrn).addTaskState(taskState); + } + + return datasetStatesByUrns; + } + + private static String createDatasetUrn(Map datasetStatesByUrns, TaskState taskState) { + String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); + if (!datasetStatesByUrns.containsKey(datasetUrn)) { + JobState.DatasetState datasetState = new JobState.DatasetState(); + datasetState.setDatasetUrn(datasetUrn); + datasetStatesByUrns.put(datasetUrn, datasetState); + } + return datasetUrn; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java index 95425a64371..88838ca6a88 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.ddm.launcher; +import io.temporal.client.WorkflowOptions; import java.net.URI; import java.util.List; import java.util.Properties; @@ -25,7 +26,6 @@ import lombok.extern.slf4j.Slf4j; import com.typesafe.config.ConfigFactory; -import io.temporal.client.WorkflowOptions; import org.apache.hadoop.fs.Path; import org.apache.gobblin.metrics.Tag; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java index 9af6995b509..d425bbc4b15 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java @@ -24,7 +24,9 @@ import io.temporal.worker.WorkerOptions; import org.apache.gobblin.temporal.cluster.AbstractTemporalWorker; +import org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl; import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl; +import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.NestingExecOfProcessWorkUnitWorkflowImpl; import org.apache.gobblin.temporal.ddm.workflow.impl.ProcessWorkUnitsWorkflowImpl; @@ -40,12 +42,12 @@ public WorkFulfillmentWorker(Config config, WorkflowClient workflowClient) { @Override protected Class[] getWorkflowImplClasses() { - return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class }; + return new Class[] { ProcessWorkUnitsWorkflowImpl.class, NestingExecOfProcessWorkUnitWorkflowImpl.class, CommitStepWorkflowImpl.class }; } @Override protected Object[] getActivityImplInstances() { - return new Object[] { new ProcessWorkUnitImpl() }; + return new Object[] { new ProcessWorkUnitImpl(), new CommitActivityImpl() }; } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java new file mode 100644 index 00000000000..f6f497027c6 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; + + +/** + * Workflow for committing the output of work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + */ +@WorkflowInterface +public interface CommitStepWorkflow { + + /** + * Commit the output of the work done by {@link org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl} + * @return number of workunits committed + */ + @WorkflowMethod + int commit(WUProcessingSpec workSpec); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java new file mode 100644 index 00000000000..2b674ec1979 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.workflow.impl; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.workflow.Workflow; + +import java.time.Duration; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.temporal.ddm.activity.CommitActivity; +import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; + + +@Slf4j +public class CommitStepWorkflowImpl implements CommitStepWorkflow { + + private static final RetryOptions ACTIVITY_RETRY_OPTS = RetryOptions.newBuilder() + .setInitialInterval(Duration.ofSeconds(3)) + .setMaximumInterval(Duration.ofSeconds(100)) + .setBackoffCoefficient(2) + .setMaximumAttempts(4) + .build(); + + private static final ActivityOptions ACTIVITY_OPTS = ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(999)) + .setRetryOptions(ACTIVITY_RETRY_OPTS) + .build(); + + private final CommitActivity activityStub = Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS); + + @Override + public int commit(WUProcessingSpec workSpec) { + return activityStub.commit(workSpec); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java index eafc624096e..141f220579e 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java @@ -23,12 +23,14 @@ import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.workflow.ChildWorkflowOptions; import io.temporal.workflow.Workflow; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.temporal.cluster.WorkerConfig; import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec; import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck; import org.apache.gobblin.temporal.ddm.work.assistance.Help; import org.apache.gobblin.temporal.ddm.work.styles.FileSystemJobStateful; +import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow; import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow; import org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload; import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; @@ -36,17 +38,30 @@ import org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow; +@Slf4j public class ProcessWorkUnitsWorkflowImpl implements ProcessWorkUnitsWorkflow { public static final String CHILD_WORKFLOW_ID_BASE = "NestingExecWorkUnits"; + public static final String COMMIT_STEP_WORKFLOW_ID_BASE = "CommitStepWorkflow"; @Override public int process(WUProcessingSpec workSpec) { Workload workload = createWorkload(workSpec); NestingExecWorkflow processingWorkflow = createProcessingWorkflow(workSpec); - return processingWorkflow.performWorkload( + int workunitsProcessed = processingWorkflow.performWorkload( WorkflowAddr.ROOT, workload, 0, workSpec.getTuning().getMaxBranchesPerTree(), workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty() ); + if (workunitsProcessed > 0) { + CommitStepWorkflow commitWorkflow = createCommitStepWorkflow(); + int result = commitWorkflow.commit(workSpec); + if (result == 0) { + log.warn("No work units committed at the job level. They could be committed at a task level."); + } + return result; + } else { + log.error("No work units processed, so no commit attempted."); + return 0; + } } protected Workload createWorkload(WUProcessingSpec workSpec) { @@ -61,4 +76,13 @@ protected NestingExecWorkflow createProcessingWorkflow(FileS // TODO: to incorporate multiple different concrete `NestingExecWorkflow` sub-workflows in the same super-workflow... shall we use queues?!?!? return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + + protected CommitStepWorkflow createCommitStepWorkflow() { + ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) + .setWorkflowId(Help.qualifyNamePerExec(COMMIT_STEP_WORKFLOW_ID_BASE, WorkerConfig.of(this).orElse(ConfigFactory.empty()))) + .build(); + + return Workflow.newChildWorkflowStub(CommitStepWorkflow.class, childOpts); + } }