Skip to content

Commit

Permalink
merge duplicated edges in Gearpump's DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
huafengw committed Feb 15, 2016
1 parent 44f42f8 commit 1a8e684
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 62 deletions.
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
<s4.version>0.6.0-incubating</s4.version>
<samza.version>0.7.0</samza.version>
<flink.version>0.9.0</flink.version>
<gearpump.version>0.7.5</gearpump.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.5</scala.version>
<slf4j-log4j12.version>1.7.2</slf4j-log4j12.version>
<slf4j-simple.version>1.7.5</slf4j-simple.version>
<maven-surefire-plugin.version>2.18</maven-surefire-plugin.version>
Expand Down
69 changes: 28 additions & 41 deletions samoa-gearpump/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,46 +37,18 @@
</parent>

<repositories>
<repository> <!-- repository for gearpump -->
<id>clojars</id>
<url>http://clojars.org/repo/</url>
</repository>
<repository>
<id>patriknw at bintray</id>
<url>http://dl.bintray.com/patriknw/maven</url>
</repository>
<repository>
<id>maven-repo</id>
<url>http://repo.maven.apache.org/maven2</url>
</repository>
<repository>
<id>maven1-repo</id>
<url>http://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>maven2-repo</id>
<url>http://mvnrepository.com/artifact</url>
</repository>
<repository>
<id>sonatype</id>
<url>https://oss.sonatype.org/content/repositories/releases</url>
</repository>
<repository>
<id>bintray/non</id>
<url>http://dl.bintray.com/non/maven</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
<repository>
<id>clockfly</id>
<url>http://dl.bintray.com/clockfly/maven</url>
</repository>
<repository>
<id>vincent</id>
<url>http://dl.bintray.com/fvunicorn/maven</url>
</repository>
<repository>
<id>maven2-repo</id>
<url>http://mvnrepository.com/artifact</url>
</repository>
</repositories>

<dependencies>
Expand All @@ -93,28 +65,43 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.intel-hadoop</groupId>
<artifactId>gearpump-streaming_2.11</artifactId>
<version>0.7.4</version>
<artifactId>gearpump-streaming_${scala.binary.version}</artifactId>
<version>${gearpump.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.5</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- SAMOA assembly -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void onStart(StartTime startTime) {
public void onNext(Message msg) {
if (entranceProcessor.hasNext()) {
GearpumpMessage message =
new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId());
new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId(), outputStream.getScheme());
taskContext.output(new Message(message, System.currentTimeMillis()));
}
self().tell(new Message("continue", System.currentTimeMillis()), self());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
*/

import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.utils.PartitioningScheme;

import java.io.Serializable;

public class GearpumpMessage implements Serializable {
private ContentEvent event;
private PartitioningScheme scheme;
private String targetId;

public GearpumpMessage() {
this(null, null);
this(null, null, null);
}

public GearpumpMessage(ContentEvent event, String targetId) {
public GearpumpMessage(ContentEvent event, String targetId, PartitioningScheme scheme) {
this.event = event;
this.targetId = targetId;
this.scheme = scheme;
}

public String getTargetId() {
Expand All @@ -52,4 +55,12 @@ public ContentEvent getEvent() {
public void setEvent(ContentEvent event) {
this.event = event;
}

public PartitioningScheme getScheme() {
return scheme;
}

public void setScheme(PartitioningScheme scheme) {
this.scheme = scheme;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void setScheme(PartitioningScheme scheme) {

@Override
public void put(ContentEvent event) {
GearpumpMessage message = new GearpumpMessage(event, targetId);
GearpumpMessage message = new GearpumpMessage(event, targetId, scheme);
taskContext.output(new Message(message, System.currentTimeMillis()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.apache.samoa.topology.impl.gearpump;

import io.gearpump.Message;
import io.gearpump.partitioner.*;
import scala.collection.immutable.*;

public class SamoaMessagePartitioner implements MulticastPartitioner {
ShufflePartitioner shufflePartitioner = new ShufflePartitioner();
BroadcastPartitioner broadcastPartitioner = new BroadcastPartitioner();
HashPartitioner hashPartitioner = new HashPartitioner();

@Override
public List<Object> getPartitions(Message msg, int partitionNum, int currentPartitionId) {
GearpumpMessage message = (GearpumpMessage) msg.msg();
Integer partition = -1;
List result = null;
switch (message.getScheme()) {
case SHUFFLE:
partition = shufflePartitioner.getPartition(msg, partitionNum, currentPartitionId);
result = $colon$colon$.MODULE$.apply(partition, (List)Nil$.MODULE$);
break;
case BROADCAST:
result = broadcastPartitioner.getPartitions(msg, partitionNum);
break;
case GROUP_BY_KEY:
//Todo replace HashPartitioner
partition = hashPartitioner.getPartition(msg, partitionNum, currentPartitionId);
result = $colon$colon$.MODULE$.apply(partition, (List)Nil$.MODULE$);
break;
}
return result;
}

@Override
public List<Object> getPartitions(Message msg, int partitionNum) {
return this.getPartitions(msg, partitionNum, -1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
* #L%
*/

import io.gearpump.partitioner.BroadcastPartitioner;
import io.gearpump.partitioner.HashPartitioner;
import io.gearpump.partitioner.Partitioner;
import io.gearpump.partitioner.ShufflePartitioner;
import io.gearpump.streaming.Processor;
import io.gearpump.util.Graph;
import org.apache.samoa.topology.AbstractTopology;
import org.apache.samoa.topology.IProcessingItem;
import org.apache.samoa.topology.Stream;
import org.apache.samoa.utils.PartitioningScheme;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -61,25 +57,13 @@ private void buildGraph() {
}

Set<Stream> streams = getStreams();
Partitioner partitioner = new SamoaMessagePartitioner();
for (Stream stream : streams) {
GearpumpStream gearpumpStream = (GearpumpStream) stream;
IProcessingItem sourcePi = gearpumpStream.getSourceProcessingItem();
IProcessingItem targetPi = gearpumpStream.getTargetPi();
Processor sourceProcessor = piToProcessor.get(sourcePi);
Processor targetProcessor = piToProcessor.get(targetPi);
PartitioningScheme scheme = gearpumpStream.getScheme();
Partitioner partitioner = null;
switch (scheme) {
case SHUFFLE:
partitioner = new ShufflePartitioner();
break;
case GROUP_BY_KEY:
partitioner = new HashPartitioner();
break;
case BROADCAST:
partitioner = new BroadcastPartitioner();
break;
}
graph.addEdge(sourceProcessor, partitioner, targetProcessor);
}
}
Expand Down

0 comments on commit 1a8e684

Please sign in to comment.