diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java index 9f0895d0230dd..8d38814ff485f 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.compiler.plan; import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; @@ -36,7 +35,7 @@ import org.apache.flink.util.Visitor; /** - * + * A node in the execution, representing a workset iteration (delta iteration). */ public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode { @@ -66,7 +65,7 @@ public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName, SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode, PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode) { - super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.NONE); + super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP); this.solutionSetPlanNode = solutionSetPlanNode; this.worksetPlanNode = worksetPlanNode; this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode; diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java index 2be28e4241f0d..2794572e1720d 100644 --- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.functions.RichJoinFunction; import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; @@ -227,6 +228,40 @@ public void testIterationPushingWorkOut() throws Exception { } } + @Test + public void testWorksetIterationPipelineBreakerPlacement() { + try { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(8); + + // the workset (input two of the delta iteration) is the same as what is consumed be the successive join + DataSet> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); + + DataSet> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue()); + + // trivial iteration, since we are interested in the inputs to the iteration + DeltaIteration, Tuple2> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0); + + DataSet> next = iteration.getWorkset().map(new IdentityMapper>()); + + DataSet> result = iteration.closeWith(next, next); + + initialWorkset + .join(result, JoinHint.REPARTITION_HASH_FIRST) + .where(0).equalTo(0) + .print(); + + Plan p = env.createProgramPlan(); + compileNoStats(p); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + public static DataSet> doBulkIteration(DataSet> vertices, DataSet> edges) { // open a bulk iteration @@ -270,6 +305,8 @@ public static DataSet> doDeltaIteration(DataSet, Tuple2, Tuple2> { @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 6b768b763227d..a30d4c927634f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -72,6 +72,8 @@ import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; +import com.google.common.base.Preconditions; + /** * A DataSet represents a collection of elements of the same type.
* A DataSet can be transformed into another DataSet by applying a transformation as for example @@ -847,6 +849,9 @@ public IterativeDataSet iterate(int maxIterations) { * @see org.apache.flink.api.java.operators.DeltaIteration */ public DeltaIteration iterateDelta(DataSet workset, int maxIterations, int... keyPositions) { + Preconditions.checkNotNull(workset); + Preconditions.checkNotNull(keyPositions); + Keys.ExpressionKeys keys = new Keys.ExpressionKeys(keyPositions, getType(), false); return new DeltaIteration(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations); }