From ff07f83f1c9db2fc3fe7df005da34c188400ca47 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Wed, 27 Sep 2023 12:03:50 -0700 Subject: [PATCH 01/10] Define `Workload` abstraction for Temporal workflows of unbounded size through sub-workflow nesting --- .../util/nesting/work/SeqBackedWorkSpan.java | 70 +++++++++ .../nesting/work/SeqSliceBackedWorkSpan.java | 71 +++++++++ .../temporal/util/nesting/work/WFAddr.java | 65 ++++++++ .../temporal/util/nesting/work/Workload.java | 57 +++++++ .../AbstractNestingExecWorkflowImpl.java | 143 ++++++++++++++++++ .../nesting/workflow/NestingExecWorkflow.java | 54 +++++++ 6 files changed, 460 insertions(+) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java new file mode 100644 index 00000000000..51f67b85936 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqBackedWorkSpan.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import java.util.Iterator; +import java.util.List; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection */ +@NoArgsConstructor +@RequiredArgsConstructor +public class SeqBackedWorkSpan implements Workload.WorkSpan { + + @NonNull + private List elems; + // CAUTION: despite the "warning: @NonNull is meaningless on a primitive @lombok.RequiredArgsConstructor"... + // if removed, no two-arg ctor is generated, so syntax error on `new CollectionBackedTaskSpan(elems, startIndex)` + @NonNull + private int startingIndex; + private transient Iterator statefulDelegatee = null; + + @Override + public int getNumElems() { + return elems.size(); + } + + @Override + public boolean hasNext() { + if (statefulDelegatee == null) { + statefulDelegatee = elems.iterator(); + } + return statefulDelegatee.hasNext(); + } + + @Override + public WORK_ITEM next() { + if (statefulDelegatee == null) { + throw new IllegalStateException("first call `hasNext()`!"); + } + return statefulDelegatee.next(); + } + + @Override + public String toString() { + return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() + "})"; + } + + protected String getClassNickname() { + // return getClass().getSimpleName(); + return "WorkSpan"; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java new file mode 100644 index 00000000000..ecefaed073a --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection, *SHARED* w/ other work spans */ +@NoArgsConstructor +@RequiredArgsConstructor +public class SeqSliceBackedWorkSpan implements Workload.WorkSpan { + private static final int NOT_SET_SENTINEL = -1; + + @NonNull private WORK_ITEM[] sharedElems; + // CAUTION: despite the "warning: @NonNull is meaningless on a primitive @lombok.RequiredArgsConstructor"... + // if removed, no two-arg ctor is generated, so syntax error on `new CollectionSliceBackedTaskSpan(elems, startIndex)` + @NonNull private int startingIndex; + @NonNull private int numElements; + private transient volatile int nextElemIndex = NOT_SET_SENTINEL; + + @Override + public int getNumElems() { + return getEndingIndex() - startingIndex; + } + + @Override + public boolean hasNext() { + if (nextElemIndex == NOT_SET_SENTINEL) { + nextElemIndex = startingIndex; // NOTE: `startingIndex` should be effectively `final` (post-deser) and always >= 0 + } + return nextElemIndex < this.getEndingIndex(); + } + + @Override + public WORK_ITEM next() { + return sharedElems[nextElemIndex++]; + } + + @Override + public String toString() { + return getClassNickname() + "(" + startingIndex + "... {+" + getNumElems() + "})"; + } + + protected String getClassNickname() { + // return getClass().getSimpleName(); + return "WorkSpan"; + } + + @JsonIgnore // (because no-arg method resembles 'java bean property') + protected final int getEndingIndex() { + return Math.min(startingIndex + numElements, sharedElems.length); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java new file mode 100644 index 00000000000..68886296ab2 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + + +/** Hierarchical address for nesting workflows (0-based). */ +@NoArgsConstructor // IMPORTANT: for jackson (de)serialization +@RequiredArgsConstructor +public class WFAddr { + public static final String SEP = "."; + + /** initial, top-level address */ + public static final WFAddr ROOT = new WFAddr(0); + + @Getter + @NonNull // IMPORTANT: for jackson (de)serialization (which won't permit `final`) + private List segments; + + public WFAddr(final int firstLevelOnly) { + this(Lists.newArrayList(firstLevelOnly)); + } + + /** @return 0-based depth */ + @JsonIgnore // (because no-arg method resembles 'java bean property') + public int getDepth() { + return segments.size() - 1; + } + + /** Create a child of the current `WFAddr` */ + public WFAddr createChild(int childLevel) { + final List copy = new ArrayList<>(segments); + copy.add(childLevel); + return new WFAddr(copy); + } + + @Override + public String toString() { + return Joiner.on(SEP).join(segments); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java new file mode 100644 index 00000000000..ff467cea486 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.work; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import java.util.Iterator; +import java.util.Optional; + + +/** + * `Workload` models a logical collection of homogenous inputs over which a "foreach" operation can asynchronously apply + * an arbitrary procedure to each element. This encapsulates "processing" the entire collection of sequential + * "work item" specifications by the uniform application of the chosen procedure(s). + * + * Given Temporal's required determinism, the work items and work spans should remain unchanged, with stable sequential + * ordering. This need not constrain `Workload`s to eager, advance elaboration: "streaming" definition is possible, + * so long as producing a deterministic result. + * + * A actual, real-world workload might correspond to datastore contents, such as records serialized into HDFS files + * or ordered DB query results. + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls +public interface Workload { + + /** + * @return a sequential sub-sequence, from `startIndex` (0-based), unless it falls beyond the underlying sequence + * NOTE: this is a blocking call that forces elaboration: `WorkSpan.getNumElems() < numElements` signifies end of seq + */ + Optional> getSpan(int startIndex, int numElements); + + /** Non-blocking, best-effort advice: to support non-strict elaboration, does NOT guarantee `index` will not exceed */ + boolean isIndexKnownToExceed(int index); + + default boolean isDefiniteSize() { + return false; + } + + /** Logical sub-sequence 'slice' of contiguous work items */ + public interface WorkSpan extends Iterator { + int getNumElems(); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java new file mode 100644 index 00000000000..2173c965a88 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.workflow; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.compress.utils.Lists; + +import io.temporal.api.enums.v1.ParentClosePolicy; +import io.temporal.workflow.Async; +import io.temporal.workflow.ChildWorkflowOptions; +import io.temporal.workflow.Promise; +import io.temporal.workflow.Workflow; + +import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only define {@link #launchAsyncActivity} */ +@Slf4j +public abstract class AbstractNestingExecWorkflowImpl implements NestingExecWorkflow { + + @Override + public int performWorkload( + final WFAddr addr, + final Workload workload, + final int startIndex, + final int maxBranchesPerTree, + final int maxSubTreesPerTree, + final Optional maxSubTreesForCurrentTreeOverride + ) { + final int maxSubTreesForCurrent = maxSubTreesForCurrentTreeOverride.orElse(maxSubTreesPerTree); + final int maxLeaves = maxBranchesPerTree - maxSubTreesForCurrent; + final Optional> optSpan = workload.getSpan(startIndex, maxLeaves); + log.info("[" + addr + "] " + workload + " w/ start '" + startIndex + "'" + + "; tree (" + maxBranchesPerTree + "/" + maxSubTreesPerTree + "): " + optSpan); + if (!optSpan.isPresent()) { + return 0; + } else { + final Workload.WorkSpan workSpan = optSpan.get(); + final Iterable iterable = () -> workSpan; + final List> childActivities = StreamSupport.stream(iterable.spliterator(), false) + .map(t -> launchAsyncActivity(t)) + .collect(Collectors.toList()); + final List> childSubTrees = new ArrayList<>(); + if (workSpan.getNumElems() == maxLeaves) { // received as many as requested (did not stop short) + int subTreeId = 0; + for (int subTreeChildMaxSubTreesPerTree + : consolidateSubTreeGrandChildren(maxSubTreesForCurrent, maxBranchesPerTree, maxSubTreesPerTree)) { + // CAUTION: calc these *before* incrementing `subTreeId`! + final int childStartIndex = startIndex + maxLeaves + (maxBranchesPerTree * subTreeId); + final int nextChildId = maxLeaves + subTreeId; + final WFAddr childAddr = addr.createChild(nextChildId); + final NestingExecWorkflow child = createChildWorkflow(childAddr); + if (!workload.isIndexKnownToExceed(childStartIndex)) { // best-effort short-circuiting + childSubTrees.add( + Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, + maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); + ++subTreeId; + } + } + } + final Promise allActivityChildren = Promise.allOf(childActivities); + allActivityChildren.get(); // ensure all complete prior to counting them in `overallActivitiesRollupCount` + // TODO: determine whether any benefit to unordered `::get` blocking for any next ready (perhaps no difference...) + final int descendantActivitiesRollupCount = childSubTrees.stream().map(Promise::get).reduce(0, (x, y) -> x + y); + // TODO: consider a generalized reduce op for things other than counting! + final int overallActivitiesRollupCount = workSpan.getNumElems() + descendantActivitiesRollupCount; + log.info("[" + addr + "] activites finished coordinating: " + overallActivitiesRollupCount); + return overallActivitiesRollupCount; + } + } + + /** Factory for invoking the specific activity by providing it args via {@link Async::function} */ + protected abstract Promise launchAsyncActivity(WORK_ITEM task); + + protected NestingExecWorkflow createChildWorkflow(final WFAddr childAddr) { + ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() + .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) + .setWorkflowId("NestingExecWorkflow-" + childAddr) + .build(); + return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); + } + + /** + * "right-tilt" sub-tree's grandchildren, so final child gets all grandchildren (vs. constant grandchildren/child) + * i.e. NOT!: + * List naiveUniformity = Collections.nCopies(numSubTreesPerSubTree, numSubTreeChildren); + * @return each sub-tree's desired size, in ascending sub-tree order + */ + protected static List consolidateSubTreeGrandChildren( + final int numSubTreesPerSubTree, + final int numChildrenTotal, + final int numSubTreeChildren + ) { + if (numSubTreesPerSubTree <= 0) { + return Lists.newArrayList(); + } else if (isSqrt(numSubTreeChildren, numChildrenTotal)) { + // redistribute all grandchild sub-trees to pack every grandchild beneath the final child sub-tree + final List grandChildCounts = new ArrayList<>(Collections.nCopies(numSubTreesPerSubTree - 1, 0)); + grandChildCounts.add(numChildrenTotal); + return grandChildCounts; + } else { + final int totalGrandChildSubTrees = numSubTreesPerSubTree * numSubTreeChildren; + final int numTreesWithSolelySubTreeBranches = totalGrandChildSubTrees / numChildrenTotal; + final int numSubTreesRemaining = totalGrandChildSubTrees % numChildrenTotal; + assert (numTreesWithSolelySubTreeBranches == 1 && numSubTreesRemaining == 0) || numTreesWithSolelySubTreeBranches == 0 + : "present limitation: at most one sub-tree may use further branching: (found: numSubTreesPerSubTree: " + + numSubTreesPerSubTree + "; numChildrenTotal: " + numChildrenTotal + " / numSubTreeChildren: " + + numSubTreeChildren + ")"; + final List grandChildCounts = new ArrayList<>(Collections.nCopies(numSubTreesPerSubTree - (numTreesWithSolelySubTreeBranches + 1), 0)); + grandChildCounts.addAll(Collections.nCopies(Math.min(1, numSubTreesPerSubTree - numTreesWithSolelySubTreeBranches), numSubTreesRemaining)); + grandChildCounts.addAll(Collections.nCopies(Math.min(numTreesWithSolelySubTreeBranches, numSubTreesPerSubTree), numChildrenTotal)); + return grandChildCounts; + } + } + + /** @return whether `maxSubTrees` == `Math.sqrt(maxBranches)` */ + private static boolean isSqrt(int maxSubTrees, int maxBranches) { + return maxSubTrees > 0 && maxSubTrees * maxSubTrees == maxBranches; + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java new file mode 100644 index 00000000000..3a4f55287cd --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.util.nesting.workflow; + +import java.util.Optional; + +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.Workload; + + +/** + * Process all `WORK_ITEM`s of `workload`, from `startIndex` to the end by creating child workflows, where this and + * descendants should have at most `maxBranchesPerTree`, with at most `maxSubTreesPerTree` of those being child + * workflows. (Non-child-workflow branches being activities.) + * + * The underlying motivation is to create logical workflows of unbounded size, despite Temporal's event history limit + * of 50Ki events; see: https://docs.temporal.io/workflows#event-history + * + * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a good rule-of-thumb; `maxSubTreesPerTree + * should not exceed that. + * + * @param the type of task for which to invoke an appropriate activity + * @param maxSubTreesForCurrentTreeOverride when the current tree should use different max sub-trees than descendants + */ +@WorkflowInterface +public interface NestingExecWorkflow { + @WorkflowMethod + int performWorkload( + WFAddr addr, + Workload workload, + int startIndex, + int maxBranchesPerTree, + int maxSubTreesPerTree, + Optional maxSubTreesForCurrentTreeOverride + ); +} From 82056d938943df16640c95534d94b10d543bfebe Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Wed, 27 Sep 2023 09:17:02 -0700 Subject: [PATCH 02/10] Adjust Gobblin-Temporal configurability for consistency and abstraction --- .../GobblinTemporalConfigurationKeys.java | 9 ++++-- .../cluster/AbstractTemporalWorker.java | 9 +++--- .../cluster/GobblinTemporalTaskRunner.java | 12 ++++---- .../temporal/cluster/TemporalWorker.java | 28 +++++++++++++++++++ .../GobblinTemporalJobScheduler.java | 2 +- 5 files changed, 44 insertions(+), 16 deletions(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 63e9adc38d4..f238ffc9b23 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -30,15 +30,18 @@ public interface GobblinTemporalConfigurationKeys { String PREFIX = "gobblin.temporal."; - String WORKER_CLASS = PREFIX + "worker"; + String WORKER_CLASS = PREFIX + "worker.class"; String DEFAULT_WORKER_CLASS = HelloWorldWorker.class.getName(); String GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; String DEFAULT_GOBBLIN_TEMPORAL_NAMESPACE = PREFIX + "namespace"; String GOBBLIN_TEMPORAL_TASK_QUEUE = PREFIX + "task.queue.name"; String DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE = "GobblinTemporalTaskQueue"; - String GOBBLIN_TEMPORAL_JOB_LAUNCHER = PREFIX + "job.launcher"; - String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER = HelloWorldJobLauncher.class.getName(); + String GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX = PREFIX + "job.launcher."; + String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "class"; + String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName(); + + String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg."; /** * Number of worker processes to spin up per task runner diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java index 856257f28ed..65a75ddd8df 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java @@ -26,8 +26,8 @@ import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.util.ConfigUtils; - -public abstract class AbstractTemporalWorker { +/** Basic boilerplate for a temporal "worker" to register its activity and workflow capabilities and listen on a particular queue */ +public abstract class AbstractTemporalWorker implements TemporalWorker { private final WorkflowClient workflowClient; private final String queueName; private final WorkerFactory workerFactory; @@ -44,6 +44,7 @@ public AbstractTemporalWorker(Config cfg, WorkflowClient client) { workerFactory = WorkerFactory.newInstance(workflowClient); } + @Override public void start() { Worker worker = workerFactory.newWorker(queueName); // This Worker hosts both Workflow and Activity implementations. @@ -55,9 +56,7 @@ public void start() { workerFactory.start(); } - /** - * Shuts down the worker. - */ + @Override public void shutdown() { workerFactory.shutdown(); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java index 79cb0baf755..61545f6ed66 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalTaskRunner.java @@ -124,7 +124,7 @@ public class GobblinTemporalTaskRunner implements StandardMetricsBridge { protected final String temporalQueueName; private final boolean isMetricReportingFailureFatal; private final boolean isEventReportingFailureFatal; - private final List workers; + private final List workers; public GobblinTemporalTaskRunner(String applicationName, String applicationId, @@ -234,7 +234,7 @@ public void start() } } - private AbstractTemporalWorker initiateWorker() throws Exception{ + private TemporalWorker initiateWorker() throws Exception { logger.info("Starting Temporal Worker"); String connectionUri = clusterConfig.getString(GobblinTemporalConfigurationKeys.TEMPORAL_CONNECTION_STRING); @@ -246,8 +246,8 @@ private AbstractTemporalWorker initiateWorker() throws Exception{ String workerClassName = ConfigUtils.getString(clusterConfig, GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS); - AbstractTemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor( - (Class) Class.forName(workerClassName), clusterConfig, client); + TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor( + (Class)Class.forName(workerClassName), clusterConfig, client); worker.start(); logger.info("A new worker is started."); return worker; @@ -286,9 +286,7 @@ public synchronized void stop() { this.containerMetrics.get().stopMetricsReporting(); } - for (AbstractTemporalWorker worker : workers) { - worker.shutdown(); - } + workers.forEach(TemporalWorker::shutdown); logger.info("All services are stopped."); diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java new file mode 100644 index 00000000000..a2b2eb20201 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.cluster; + +/** Marker interface for a temporal.io "worker", with capability to `start()` and `stop()` */ +public interface TemporalWorker { + + /** Starts the worker */ + void start(); + + /** Shuts down the worker */ + void shutdown(); +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java index 1b1ee14a40d..d93e2a6702a 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/joblauncher/GobblinTemporalJobScheduler.java @@ -180,7 +180,7 @@ public GobblinTemporalJobLauncher buildJobLauncher(Properties jobProps) Class jobLauncherClass = (Class) Class.forName(combinedProps.getProperty( - GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER)); + GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS)); return GobblinConstructorUtils.invokeLongestConstructor(jobLauncherClass, combinedProps, this.appWorkDir, this.metadataTags, From 941ee2318eea51c5aa5bae43260d712895743793 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Thu, 26 Oct 2023 18:50:07 -0700 Subject: [PATCH 03/10] Define `WorkerConfig`, to pass the `TemporalWorker`'s configuration to the workflows and activities it hosts --- .../cluster/AbstractTemporalWorker.java | 19 ++++- .../temporal/cluster/WorkerConfig.java | 77 +++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java index 65a75ddd8df..a3df550252c 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java @@ -17,15 +17,19 @@ package org.apache.gobblin.temporal.cluster; +import java.util.Arrays; + import com.typesafe.config.Config; import io.temporal.client.WorkflowClient; import io.temporal.worker.Worker; import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerOptions; import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; import org.apache.gobblin.util.ConfigUtils; + /** Basic boilerplate for a temporal "worker" to register its activity and workflow capabilities and listen on a particular queue */ public abstract class AbstractTemporalWorker implements TemporalWorker { private final WorkflowClient workflowClient; @@ -42,11 +46,13 @@ public AbstractTemporalWorker(Config cfg, WorkflowClient client) { // Create a Worker factory that can be used to create Workers that poll specific Task Queues. workerFactory = WorkerFactory.newInstance(workflowClient); + + stashWorkerConfig(cfg); } @Override public void start() { - Worker worker = workerFactory.newWorker(queueName); + Worker worker = workerFactory.newWorker(queueName, createWorkerOptions()); // This Worker hosts both Workflow and Activity implementations. // Workflows are stateful, so you need to supply a type to create instances. worker.registerWorkflowImplementationTypes(getWorkflowImplClasses()); @@ -61,9 +67,20 @@ public void shutdown() { workerFactory.shutdown(); } + protected WorkerOptions createWorkerOptions() { + return null; + } + /** @return workflow types for *implementation* classes (not interface) */ protected abstract Class[] getWorkflowImplClasses(); /** @return activity instances; NOTE: activities must be stateless and thread-safe, so a shared instance is used. */ protected abstract Object[] getActivityImplInstances(); + + private final void stashWorkerConfig(Config cfg) { + // stash in association with... + WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself + Arrays.stream(getWorkflowImplClasses()).forEach(clazz -> WorkerConfig.withImpl(clazz, cfg)); // its workflow impls + Arrays.stream(getActivityImplInstances()).forEach(obj -> WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java new file mode 100644 index 00000000000..09a07402319 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.cluster; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import com.typesafe.config.Config; + + +/** + * Static / singleton holder to stash the {@link Config} used to construct each kind of {@link org.apache.gobblin.temporal.cluster.TemporalWorker} + * (within the current JVM). Lookup may be by either the {@link Class} of the worker or of any workflow or activity implementation supplied by + * that worker. The objective is to facilitate sharing the worker's Config with workflow and activity implementations (running within that worker). + * + * ATTENTION: for sanity, construct multiple instances of the same worker always with the same {@link Config}. When this is violated, the `Config` + * given to the most-recently constructed worker "wins". + * + * NOTE: the preservation and sharing of {@link Config} is predicated entirely on its immutability. Thank you TypeSafe! + * Storage indexing uses FQ class name, not the {@link Class}, to be independent of classloader. + */ +@Slf4j +public class WorkerConfig { + private static final ConcurrentHashMap configByFQClassName = new ConcurrentHashMap<>(); + + private WorkerConfig() {} + + /** @return whether initialized now (vs. being previously known) */ + public static boolean forWorker(Class workerClass, Config config) { + // return configByFQClassName.put(workerClass.getName(), config) == null; + return storeAs(workerClass.getName(), config); + } + + /** @return whether initialized now (vs. being previously known) */ + public static boolean withImpl(Class workflowOrActivityImplClass, Config config) { + // return configByFQClassName.put(workflowOrActivityImplClass.getName(), config) == null; + return storeAs(workflowOrActivityImplClass.getName(), config); + } + + public static Optional ofWorker(Class workerClass) { + return Optional.ofNullable(configByFQClassName.get(workerClass.getName())); + // return Optional.ofNullable(retrieveBy(workerClass.getClass())); + } + + public static Optional ofImpl(Class workflowOrActivityImplClass) { + return Optional.ofNullable(configByFQClassName.get(workflowOrActivityImplClass.getName())); + // return Optional.ofNullable(retrieveBy(workflowOrActivityImplClass.getClass())); + } + + public static Optional of(Object workflowOrActivityImpl) { + // return ofImpl(workflowOrActivityImpl.getClass()); + return ofImpl(workflowOrActivityImpl.getClass()); + } + + private static boolean storeAs(String className, Config config) { + Config prior = configByFQClassName.put(className, config); + log.info("storing config of {} values as '{}'{}", config.entrySet().size(), className, prior == null ? " (new)" : ""); + return prior == null; + } +} From c05ff36900f16d6c2f8a32fe0f45d9691d4aa770 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Thu, 26 Oct 2023 19:54:23 -0700 Subject: [PATCH 04/10] Improve javadoc --- .../gobblin/temporal/cluster/AbstractTemporalWorker.java | 2 +- .../org/apache/gobblin/temporal/cluster/TemporalWorker.java | 2 +- .../org/apache/gobblin/temporal/cluster/WorkerConfig.java | 5 ----- .../apache/gobblin/temporal/util/nesting/work/Workload.java | 2 +- .../temporal/util/nesting/workflow/NestingExecWorkflow.java | 2 +- 5 files changed, 4 insertions(+), 9 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java index a3df550252c..0ed6652eb53 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/AbstractTemporalWorker.java @@ -30,7 +30,7 @@ import org.apache.gobblin.util.ConfigUtils; -/** Basic boilerplate for a temporal "worker" to register its activity and workflow capabilities and listen on a particular queue */ +/** Basic boilerplate for a {@link TemporalWorker} to register its activity and workflow capabilities and listen on a particular queue */ public abstract class AbstractTemporalWorker implements TemporalWorker { private final WorkflowClient workflowClient; private final String queueName; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java index a2b2eb20201..5474f6421c3 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/TemporalWorker.java @@ -17,7 +17,7 @@ package org.apache.gobblin.temporal.cluster; -/** Marker interface for a temporal.io "worker", with capability to `start()` and `stop()` */ +/** Marker interface for a temporal.io "worker", with capability to `start()` and `shutdown()` */ public interface TemporalWorker { /** Starts the worker */ diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java index 09a07402319..6c2ffe6bacc 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java @@ -44,28 +44,23 @@ private WorkerConfig() {} /** @return whether initialized now (vs. being previously known) */ public static boolean forWorker(Class workerClass, Config config) { - // return configByFQClassName.put(workerClass.getName(), config) == null; return storeAs(workerClass.getName(), config); } /** @return whether initialized now (vs. being previously known) */ public static boolean withImpl(Class workflowOrActivityImplClass, Config config) { - // return configByFQClassName.put(workflowOrActivityImplClass.getName(), config) == null; return storeAs(workflowOrActivityImplClass.getName(), config); } public static Optional ofWorker(Class workerClass) { return Optional.ofNullable(configByFQClassName.get(workerClass.getName())); - // return Optional.ofNullable(retrieveBy(workerClass.getClass())); } public static Optional ofImpl(Class workflowOrActivityImplClass) { return Optional.ofNullable(configByFQClassName.get(workflowOrActivityImplClass.getName())); - // return Optional.ofNullable(retrieveBy(workflowOrActivityImplClass.getClass())); } public static Optional of(Object workflowOrActivityImpl) { - // return ofImpl(workflowOrActivityImpl.getClass()); return ofImpl(workflowOrActivityImpl.getClass()); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java index ff467cea486..239825f7cff 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/Workload.java @@ -23,7 +23,7 @@ /** - * `Workload` models a logical collection of homogenous inputs over which a "foreach" operation can asynchronously apply + * {@link Workload} models a logical collection of homogenous inputs over which a "foreach" operation can asynchronously apply * an arbitrary procedure to each element. This encapsulates "processing" the entire collection of sequential * "work item" specifications by the uniform application of the chosen procedure(s). * diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java index 3a4f55287cd..abae6032145 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java @@ -27,7 +27,7 @@ /** - * Process all `WORK_ITEM`s of `workload`, from `startIndex` to the end by creating child workflows, where this and + * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end by creating child workflows, where this and * descendants should have at most `maxBranchesPerTree`, with at most `maxSubTreesPerTree` of those being child * workflows. (Non-child-workflow branches being activities.) * From 494dd088b7062ae12409d394d6a90a46a9e393d1 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Thu, 26 Oct 2023 19:58:26 -0700 Subject: [PATCH 05/10] Javadoc fixup --- .../java/org/apache/gobblin/temporal/cluster/WorkerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java index 6c2ffe6bacc..ea581df2505 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/WorkerConfig.java @@ -26,7 +26,7 @@ /** - * Static / singleton holder to stash the {@link Config} used to construct each kind of {@link org.apache.gobblin.temporal.cluster.TemporalWorker} + * Static holder to stash the {@link Config} used to construct each kind of {@link org.apache.gobblin.temporal.cluster.TemporalWorker} * (within the current JVM). Lookup may be by either the {@link Class} of the worker or of any workflow or activity implementation supplied by * that worker. The objective is to facilitate sharing the worker's Config with workflow and activity implementations (running within that worker). * From 0eb2d8c501b3938085c10a7909a1bdeff41f93a6 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Thu, 26 Oct 2023 22:31:54 -0700 Subject: [PATCH 06/10] Minor changes --- .../nesting/workflow/AbstractNestingExecWorkflowImpl.java | 5 ++++- .../temporal/util/nesting/workflow/NestingExecWorkflow.java | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 2173c965a88..6cfd2d5a405 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -96,9 +96,12 @@ public int performWorkload( protected abstract Promise launchAsyncActivity(WORK_ITEM task); protected NestingExecWorkflow createChildWorkflow(final WFAddr childAddr) { + // preserve the current workflow ID of this parent, but add the (hierarchical) address extension specific to each child + String thisWorkflowId = Workflow.getInfo().getWorkflowId(); + String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + childAddr; ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder() .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON) - .setWorkflowId("NestingExecWorkflow-" + childAddr) + .setWorkflowId(childWorkflowId) .build(); return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java index abae6032145..d3298610480 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java @@ -29,13 +29,13 @@ /** * Process all `WORK_ITEM`s of {@link Workload}, from `startIndex` to the end by creating child workflows, where this and * descendants should have at most `maxBranchesPerTree`, with at most `maxSubTreesPerTree` of those being child - * workflows. (Non-child-workflow branches being activities.) + * workflows. (Non-child-workflow (terminal) branches are the activity executions.) * * The underlying motivation is to create logical workflows of unbounded size, despite Temporal's event history limit * of 50Ki events; see: https://docs.temporal.io/workflows#event-history * * IMPORTANT: `Math.sqrt(maxBranchesPerTree) == maxSubTreesPerTree` provides a good rule-of-thumb; `maxSubTreesPerTree - * should not exceed that. + * must not exceed that. This enables consolidation, wherein continued expansion occurs only along the tree's right-most edges. * * @param the type of task for which to invoke an appropriate activity * @param maxSubTreesForCurrentTreeOverride when the current tree should use different max sub-trees than descendants From 48e2bf4099b28bf6e866b3c2547046282e1e08b6 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Fri, 27 Oct 2023 17:23:59 -0700 Subject: [PATCH 07/10] Update per review suggestions --- .../nesting/work/{WFAddr.java => WorkflowAddr.java} | 10 +++++----- .../workflow/AbstractNestingExecWorkflowImpl.java | 8 ++++---- .../util/nesting/workflow/NestingExecWorkflow.java | 5 +++-- 3 files changed, 12 insertions(+), 11 deletions(-) rename gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/{WFAddr.java => WorkflowAddr.java} (89%) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java similarity index 89% rename from gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java rename to gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java index 68886296ab2..0329d90d9fb 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WFAddr.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java @@ -31,17 +31,17 @@ /** Hierarchical address for nesting workflows (0-based). */ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization @RequiredArgsConstructor -public class WFAddr { +public class WorkflowAddr { public static final String SEP = "."; /** initial, top-level address */ - public static final WFAddr ROOT = new WFAddr(0); + public static final WorkflowAddr ROOT = new WorkflowAddr(0); @Getter @NonNull // IMPORTANT: for jackson (de)serialization (which won't permit `final`) private List segments; - public WFAddr(final int firstLevelOnly) { + public WorkflowAddr(final int firstLevelOnly) { this(Lists.newArrayList(firstLevelOnly)); } @@ -52,10 +52,10 @@ public int getDepth() { } /** Create a child of the current `WFAddr` */ - public WFAddr createChild(int childLevel) { + public WorkflowAddr createChild(int childLevel) { final List copy = new ArrayList<>(segments); copy.add(childLevel); - return new WFAddr(copy); + return new WorkflowAddr(copy); } @Override diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 6cfd2d5a405..046b9a4a33d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -33,7 +33,7 @@ import io.temporal.workflow.Promise; import io.temporal.workflow.Workflow; -import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; @@ -43,7 +43,7 @@ public abstract class AbstractNestingExecWorkflowImpl workload, final int startIndex, final int maxBranchesPerTree, @@ -71,7 +71,7 @@ public int performWorkload( // CAUTION: calc these *before* incrementing `subTreeId`! final int childStartIndex = startIndex + maxLeaves + (maxBranchesPerTree * subTreeId); final int nextChildId = maxLeaves + subTreeId; - final WFAddr childAddr = addr.createChild(nextChildId); + final WorkflowAddr childAddr = addr.createChild(nextChildId); final NestingExecWorkflow child = createChildWorkflow(childAddr); if (!workload.isIndexKnownToExceed(childStartIndex)) { // best-effort short-circuiting childSubTrees.add( @@ -95,7 +95,7 @@ public int performWorkload( /** Factory for invoking the specific activity by providing it args via {@link Async::function} */ protected abstract Promise launchAsyncActivity(WORK_ITEM task); - protected NestingExecWorkflow createChildWorkflow(final WFAddr childAddr) { + protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr childAddr) { // preserve the current workflow ID of this parent, but add the (hierarchical) address extension specific to each child String thisWorkflowId = Workflow.getInfo().getWorkflowId(); String childWorkflowId = thisWorkflowId.replaceAll("-[^-]+$", "") + "-" + childAddr; diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java index d3298610480..3a6661d0907 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/NestingExecWorkflow.java @@ -22,7 +22,7 @@ import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import org.apache.gobblin.temporal.util.nesting.work.WFAddr; +import org.apache.gobblin.temporal.util.nesting.work.WorkflowAddr; import org.apache.gobblin.temporal.util.nesting.work.Workload; @@ -42,9 +42,10 @@ */ @WorkflowInterface public interface NestingExecWorkflow { + /** @return the number of workload elements processed cumulatively by this Workflow and its children */ @WorkflowMethod int performWorkload( - WFAddr addr, + WorkflowAddr addr, Workload workload, int startIndex, int maxBranchesPerTree, From cdccabbb6843a7553eec4698c84d14f39518f62f Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Sat, 28 Oct 2023 03:23:15 -0700 Subject: [PATCH 08/10] Insert pause, to spread the load on the temporal server, before launch of each child workflow that may have direct leaves of its own --- .../AbstractNestingExecWorkflowImpl.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 046b9a4a33d..425d6284c37 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -17,6 +17,7 @@ package org.apache.gobblin.temporal.util.nesting.workflow; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -40,6 +41,8 @@ /** Core skeleton of {@link NestingExecWorkflow}: realizing classes need only define {@link #launchAsyncActivity} */ @Slf4j public abstract class AbstractNestingExecWorkflowImpl implements NestingExecWorkflow { + public static final int NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT = 10; + public static final int MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT = 100; @Override public int performWorkload( @@ -74,6 +77,12 @@ public int performWorkload( final WorkflowAddr childAddr = addr.createChild(nextChildId); final NestingExecWorkflow child = createChildWorkflow(childAddr); if (!workload.isIndexKnownToExceed(childStartIndex)) { // best-effort short-circuiting + // IMPORTANT: insert pause before launch of each child workflow that may have direct leaves of its own. periodic pauses spread the load on the + // temporal server, to avoid a sustained burst from submitting potentially very many async activities over the full hierarchical elaboration + final int numDirectLeavesChildMayHave = maxBranchesPerTree - subTreeChildMaxSubTreesPerTree; + if (numDirectLeavesChildMayHave > 0) { + Workflow.sleep(calcPauseDurationBeforeCreatingSubTree(numDirectLeavesChildMayHave)); + } childSubTrees.add( Async.function(child::performWorkload, childAddr, workload, childStartIndex, maxBranchesPerTree, maxSubTreesPerTree, Optional.of(subTreeChildMaxSubTreesPerTree))); @@ -106,6 +115,14 @@ protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr return Workflow.newChildWorkflowStub(NestingExecWorkflow.class, childOpts); } + /** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */ + protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { + // (only pause when an appreciable number of leaves) + return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT + ? Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT) + : Duration.ZERO; + } + /** * "right-tilt" sub-tree's grandchildren, so final child gets all grandchildren (vs. constant grandchildren/child) * i.e. NOT!: From 105de787e041d352342c0040c02c0a08558e6e4b Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Mon, 30 Oct 2023 14:02:09 -0700 Subject: [PATCH 09/10] Appease findbugs by having `SeqSliceBackedWorkSpan::next` throw `NoSuchElementException` --- .../temporal/util/nesting/work/SeqSliceBackedWorkSpan.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java index ecefaed073a..8b649ac51d5 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/SeqSliceBackedWorkSpan.java @@ -17,10 +17,11 @@ package org.apache.gobblin.temporal.util.nesting.work; -import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.NoSuchElementException; import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.RequiredArgsConstructor; +import com.fasterxml.jackson.annotation.JsonIgnore; /** Logical sub-sequence of `WORK_ITEM`s, backed for simplicity's sake by an in-memory collection, *SHARED* w/ other work spans */ @@ -51,6 +52,9 @@ public boolean hasNext() { @Override public WORK_ITEM next() { + if (nextElemIndex >= startingIndex + numElements) { + throw new NoSuchElementException("index " + nextElemIndex + " >= " + startingIndex + " + " + numElements); + } return sharedElems[nextElemIndex++]; } From d9878f04c62c41f0dbff2a78ab491c5bdcc87fc0 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Mon, 30 Oct 2023 14:20:37 -0700 Subject: [PATCH 10/10] Add comment --- .../util/nesting/workflow/AbstractNestingExecWorkflowImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java index 425d6284c37..0dcf19a779d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/workflow/AbstractNestingExecWorkflowImpl.java @@ -118,6 +118,7 @@ protected NestingExecWorkflow createChildWorkflow(final WorkflowAddr /** @return how long to pause prior to creating a child workflow, based on `numDirectLeavesChildMayHave` */ protected Duration calcPauseDurationBeforeCreatingSubTree(int numDirectLeavesChildMayHave) { // (only pause when an appreciable number of leaves) + // TODO: use a configuration value, for simpler adjustment, rather than hard-code return numDirectLeavesChildMayHave > MAX_CHILD_SUB_TREE_LEAVES_BEFORE_SHOULD_PAUSE_DEFAULT ? Duration.ofSeconds(NUM_SECONDS_TO_PAUSE_BEFORE_CREATING_SUB_TREE_DEFAULT) : Duration.ZERO;