Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* limitations under the License.
*/


package org.apache.flink.compiler.plan;

import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
Expand All @@ -36,7 +35,7 @@
import org.apache.flink.util.Visitor;

/**
*
* A node in the execution, representing a workset iteration (delta iteration).
*/
public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {

Expand Down Expand Up @@ -66,7 +65,7 @@ public WorksetIterationPlanNode(WorksetIterationNode template, String nodeName,
SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
{
super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.NONE);
super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
this.solutionSetPlanNode = solutionSetPlanNode;
this.worksetPlanNode = worksetPlanNode;
this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -227,6 +228,40 @@ public void testIterationPushingWorkOut() throws Exception {
}
}

@Test
public void testWorksetIterationPipelineBreakerPlacement() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setDegreeOfParallelism(8);

// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());

// trivial iteration, since we are interested in the inputs to the iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);

DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());

DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);

initialWorkset
.join(result, JoinHint.REPARTITION_HASH_FIRST)
.where(0).equalTo(0)
.print();

Plan p = env.createProgramPlan();
compileNoStats(p);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

// --------------------------------------------------------------------------------------------

public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {

// open a bulk iteration
Expand Down Expand Up @@ -270,6 +305,8 @@ public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long,

}

// --------------------------------------------------------------------------------------------

public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;

import com.google.common.base.Preconditions;

/**
* A DataSet represents a collection of elements of the same type.<br/>
* A DataSet can be transformed into another DataSet by applying a transformation as for example
Expand Down Expand Up @@ -847,6 +849,9 @@ public IterativeDataSet<T> iterate(int maxIterations) {
* @see org.apache.flink.api.java.operators.DeltaIteration
*/
public <R> DeltaIteration<T, R> iterateDelta(DataSet<R> workset, int maxIterations, int... keyPositions) {
Preconditions.checkNotNull(workset);
Preconditions.checkNotNull(keyPositions);

Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<T>(keyPositions, getType(), false);
return new DeltaIteration<T, R>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations);
}
Expand Down