diff --git a/pom.xml b/pom.xml index b2992f4e..af1204b2 100644 --- a/pom.xml +++ b/pom.xml @@ -148,6 +148,9 @@ 0.6.0-incubating 0.7.0 0.9.0 + 0.7.5 + 2.11 + 2.11.5 1.7.2 1.7.5 2.18 diff --git a/samoa-gearpump/pom.xml b/samoa-gearpump/pom.xml index 38d9d3bc..336fdfd1 100644 --- a/samoa-gearpump/pom.xml +++ b/samoa-gearpump/pom.xml @@ -37,46 +37,18 @@ - - clojars - http://clojars.org/repo/ - - - patriknw at bintray - http://dl.bintray.com/patriknw/maven - - - maven-repo - http://repo.maven.apache.org/maven2 - - - maven1-repo - http://repo1.maven.org/maven2 - - - maven2-repo - http://mvnrepository.com/artifact - sonatype https://oss.sonatype.org/content/repositories/releases - - bintray/non - http://dl.bintray.com/non/maven - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos - - - clockfly - http://dl.bintray.com/clockfly/maven - vincent http://dl.bintray.com/fvunicorn/maven + + maven2-repo + http://mvnrepository.com/artifact + @@ -93,28 +65,43 @@ ${project.version} test - com.github.intel-hadoop - gearpump-streaming_2.11 - 0.7.4 + gearpump-streaming_${scala.binary.version} + ${gearpump.version} - org.slf4j slf4j-log4j12 ${slf4j-log4j12.version} test - - org.scala-lang - scala-library - 2.11.5 - + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile-first + process-test-resources + + testCompile + + + + maven-assembly-plugin diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java index b3e612f6..12cb69d8 100644 --- a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/EntranceProcessingItemTask.java @@ -56,7 +56,7 @@ public void onStart(StartTime startTime) { public void onNext(Message msg) { if (entranceProcessor.hasNext()) { GearpumpMessage message = - new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId()); + new GearpumpMessage(entranceProcessor.nextEvent(), outputStream.getTargetId(), outputStream.getScheme()); taskContext.output(new Message(message, System.currentTimeMillis())); } self().tell(new Message("continue", System.currentTimeMillis()), self()); diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java index aeca315f..d15f4c43 100644 --- a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpMessage.java @@ -21,20 +21,23 @@ */ import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.utils.PartitioningScheme; import java.io.Serializable; public class GearpumpMessage implements Serializable { private ContentEvent event; + private PartitioningScheme scheme; private String targetId; public GearpumpMessage() { - this(null, null); + this(null, null, null); } - public GearpumpMessage(ContentEvent event, String targetId) { + public GearpumpMessage(ContentEvent event, String targetId, PartitioningScheme scheme) { this.event = event; this.targetId = targetId; + this.scheme = scheme; } public String getTargetId() { @@ -52,4 +55,12 @@ public ContentEvent getEvent() { public void setEvent(ContentEvent event) { this.event = event; } + + public PartitioningScheme getScheme() { + return scheme; + } + + public void setScheme(PartitioningScheme scheme) { + this.scheme = scheme; + } } diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java index b1f15f2a..62860579 100644 --- a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/GearpumpStream.java @@ -70,7 +70,7 @@ public void setScheme(PartitioningScheme scheme) { @Override public void put(ContentEvent event) { - GearpumpMessage message = new GearpumpMessage(event, targetId); + GearpumpMessage message = new GearpumpMessage(event, targetId, scheme); taskContext.output(new Message(message, System.currentTimeMillis())); } diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java new file mode 100644 index 00000000..0ef26e7e --- /dev/null +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/SamoaMessagePartitioner.java @@ -0,0 +1,38 @@ +package org.apache.samoa.topology.impl.gearpump; + +import io.gearpump.Message; +import io.gearpump.partitioner.*; +import scala.collection.immutable.*; + +public class SamoaMessagePartitioner implements MulticastPartitioner { + ShufflePartitioner shufflePartitioner = new ShufflePartitioner(); + BroadcastPartitioner broadcastPartitioner = new BroadcastPartitioner(); + HashPartitioner hashPartitioner = new HashPartitioner(); + + @Override + public List getPartitions(Message msg, int partitionNum, int currentPartitionId) { + GearpumpMessage message = (GearpumpMessage) msg.msg(); + Integer partition = -1; + List result = null; + switch (message.getScheme()) { + case SHUFFLE: + partition = shufflePartitioner.getPartition(msg, partitionNum, currentPartitionId); + result = $colon$colon$.MODULE$.apply(partition, (List)Nil$.MODULE$); + break; + case BROADCAST: + result = broadcastPartitioner.getPartitions(msg, partitionNum); + break; + case GROUP_BY_KEY: + //Todo replace HashPartitioner + partition = hashPartitioner.getPartition(msg, partitionNum, currentPartitionId); + result = $colon$colon$.MODULE$.apply(partition, (List)Nil$.MODULE$); + break; + } + return result; + } + + @Override + public List getPartitions(Message msg, int partitionNum) { + return this.getPartitions(msg, partitionNum, -1); + } +} diff --git a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Topology.java b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Topology.java index e48f4ff2..9c8b9ca3 100644 --- a/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Topology.java +++ b/samoa-gearpump/src/main/java/org/apache/samoa/topology/impl/gearpump/Topology.java @@ -20,16 +20,12 @@ * #L% */ -import io.gearpump.partitioner.BroadcastPartitioner; -import io.gearpump.partitioner.HashPartitioner; import io.gearpump.partitioner.Partitioner; -import io.gearpump.partitioner.ShufflePartitioner; import io.gearpump.streaming.Processor; import io.gearpump.util.Graph; import org.apache.samoa.topology.AbstractTopology; import org.apache.samoa.topology.IProcessingItem; import org.apache.samoa.topology.Stream; -import org.apache.samoa.utils.PartitioningScheme; import java.util.HashMap; import java.util.Map; @@ -61,25 +57,13 @@ private void buildGraph() { } Set streams = getStreams(); + Partitioner partitioner = new SamoaMessagePartitioner(); for (Stream stream : streams) { GearpumpStream gearpumpStream = (GearpumpStream) stream; IProcessingItem sourcePi = gearpumpStream.getSourceProcessingItem(); IProcessingItem targetPi = gearpumpStream.getTargetPi(); Processor sourceProcessor = piToProcessor.get(sourcePi); Processor targetProcessor = piToProcessor.get(targetPi); - PartitioningScheme scheme = gearpumpStream.getScheme(); - Partitioner partitioner = null; - switch (scheme) { - case SHUFFLE: - partitioner = new ShufflePartitioner(); - break; - case GROUP_BY_KEY: - partitioner = new HashPartitioner(); - break; - case BROADCAST: - partitioner = new BroadcastPartitioner(); - break; - } graph.addEdge(sourceProcessor, partitioner, targetProcessor); } }