Skip to content

Commit

Permalink
fix GEARPUMP-32, introduce source watermark
Browse files Browse the repository at this point in the history
This is for early review and contains some example codes which will be removed before merge.

Author: manuzhang <[email protected]>

Closes apache#67 from manuzhang/watermark.
  • Loading branch information
manuzhang committed Aug 22, 2016
1 parent 23daf0c commit 529799c
Show file tree
Hide file tree
Showing 38 changed files with 256 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import java.time.Instant

import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.{Task, TaskContext}

class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output

override def onStart(startTime: Instant): Unit = {
self ! Message("start")
self ! Watermark(Instant.now)
}

override def onNext(msg: Message): Unit = {
val list = Vector(getClass.getCanonicalName)
output(new Message(list, System.currentTimeMillis))
self ! Message("continue", System.currentTimeMillis())
val now = Instant.now
output(new Message(list, now.toEpochMilli))
self ! Watermark(now)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.gearpump.streaming.examples.fsio

import java.time.Instant

import org.apache.gearpump.streaming.source.Watermark
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.SequenceFile._
import org.apache.hadoop.io.{SequenceFile, Text}
Expand Down Expand Up @@ -64,6 +65,6 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
object SeqFileStreamProducer {
def INPUT_PATH: String = "inputpath"

val Start = Message("start")
val Continue = Message("continue")
val Start = Watermark(Instant.now)
val Continue = Watermark(Instant.now)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.Random
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.{Task, TaskContext}

class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
Expand All @@ -39,7 +40,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)

override def onStart(startTime: Instant): Unit = {
prepareRandomMessage
self ! Start
self ! Watermark(Instant.now)
}

private def prepareRandomMessage = {
Expand All @@ -62,18 +63,13 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
val message = messages(rand.nextInt(messages.length))
output(new Message(message, System.currentTimeMillis()))
messageCount = messageCount + 1L
self ! messageSourceMinClock
self ! Watermark(Instant.now)
}

// messageSourceMinClock represent the min clock of the message source
private def messageSourceMinClock: Message = {
Message("tick", System.currentTimeMillis())
}
}

object SOLStreamProducer {
val DEFAULT_MESSAGE_SIZE = 100
// Bytes
val BYTES_PER_MESSAGE = "bytesPerMessage"
val Start = Message("start")
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ object MessageCountApp extends AkkaApp with ArgumentsParser {

override val options: Array[(String, CLIOption[Any])] = Array(
SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
defaultValue = Some(1)),
defaultValue = Some(1)),
COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
defaultValue = Some(1)),
defaultValue = Some(1)),
SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
required = true),
required = true),
BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import org.apache.gearpump.streaming.task.TaskContext
class CountProcessor(taskContext: TaskContext, conf: UserConfig)
extends PersistentTask[Int](taskContext, conf) {

private val serializer = new ChillSerializer[Int]

override def persistentState: PersistentState[Int] = {
import com.twitter.algebird.Monoid.intMonoid
new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
new NonWindowState[Int](new AlgebirdMonoid(intMonoid), serializer)
}

override def processMessage(state: PersistentState[Int], message: Message): Unit = {
state.update(message.timestamp, 1)
state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp)))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.time.Instant

import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.{Task, TaskContext}

class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
Expand All @@ -31,14 +32,14 @@ class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
private var num = 0L
override def onStart(startTime: Instant): Unit = {
num = startTime.toEpochMilli
self ! Message("start")
self ! Watermark(startTime)
}

override def onNext(msg: Message): Unit = {
output(Message(num + "", num))
num += 1

import scala.concurrent.duration._
taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Watermark(Instant.ofEpochMilli(num)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,11 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {

for (i <- 0L to num) {
count.onNext(Message("", i))
count.state.get shouldBe Some(i + 1)
count.getState.get shouldBe Some(i + 1)
}
// Next checkpoint time is not arrived yet
when(taskContext.upstreamMinClock).thenReturn(0L)
count.onNext(PersistentTask.CHECKPOINT)
appMaster.expectNoMsg(10.milliseconds)

// Time to checkpoint
when(taskContext.upstreamMinClock).thenReturn(num)
count.onNext(PersistentTask.CHECKPOINT)
count.onWatermarkProgress(Instant.ofEpochMilli(num))
// Only the state before checkpoint time is checkpointed
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.gearpump.streaming.examples.state.processor

import java.time.Instant

import org.apache.gearpump.streaming.source.Watermark

import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -49,11 +51,10 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val genNum = new NumberGeneratorProcessor(taskContext, conf)
genNum.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
mockTaskActor.expectMsgType[Watermark]

genNum.onNext(Message("next"))
verify(taskContext).output(MockitoMatchers.any[Message])
// mockTaskActor.expectMsgType[Message]

system.terminate()
Await.result(system.whenTerminated, Duration.Inf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.scalatest.{Matchers, PropSpec}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.state.api.PersistentTask
import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
import org.apache.gearpump.streaming.task.UpdateCheckpointClock
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
Expand Down Expand Up @@ -68,17 +67,11 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match

for (i <- 0L until num) {
windowAverage.onNext(Message("" + data, i))
windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
windowAverage.getState.get shouldBe Some(AveragedValue(i + 1, data))
}

// Next checkpoint time is not arrived yet
when(taskContext.upstreamMinClock).thenReturn(0L)
windowAverage.onNext(PersistentTask.CHECKPOINT)
appMaster.expectNoMsg(10.milliseconds)

// Time to checkpoint
when(taskContext.upstreamMinClock).thenReturn(num)
windowAverage.onNext(PersistentTask.CHECKPOINT)
windowAverage.onWatermarkProgress(Instant.ofEpochMilli(num))
appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.gearpump.Message;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.javaapi.Task;
import org.apache.gearpump.streaming.source.Watermark;
import org.apache.gearpump.streaming.task.TaskContext;

import java.time.Instant;
Expand All @@ -33,13 +34,9 @@ public Split(TaskContext taskContext, UserConfig userConf) {
super(taskContext, userConf);
}

private Long now() {
return System.currentTimeMillis();
}

@Override
public void onStart(Instant startTime) {
self().tell(new Message("start", now()), self());
self().tell(new Watermark(Instant.now()), self());
}

@Override
Expand All @@ -48,8 +45,8 @@ public void onNext(Message msg) {
// Split the TEXT to words
String[] words = TEXT.split(" ");
for (int i = 0; i < words.length; i++) {
context.output(new Message(words[i], now()));
context.output(new Message(words[i], Instant.now().toEpochMilli()));
}
self().tell(new Message("next", now()), self());
self().tell(new Watermark(Instant.now()), self());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit

import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task.{Task, TaskContext}

class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
import taskContext.output

override def onStart(startTime: Instant): Unit = {
self ! Message("start")
self ! Watermark(Instant.now)
}

override def onNext(msg: Message): Unit = {
Expand All @@ -41,7 +42,7 @@ class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext

import scala.concurrent.duration._
taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
Message("continue", System.currentTimeMillis()))
Watermark(Instant.now))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.gearpump.streaming.examples.wordcount

import java.time.Instant

import org.apache.gearpump.streaming.source.Watermark

import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand Down Expand Up @@ -49,7 +51,7 @@ class SplitSpec extends WordSpec with Matchers {
val conf = UserConfig.empty
val split = new Split(taskContext, conf)
split.onStart(Instant.EPOCH)
mockTaskActor.expectMsgType[Message]
mockTaskActor.expectMsgType[Watermark]

val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
import org.apache.gearpump.experiments.storm.util._
import org.apache.gearpump.streaming.source.Watermark
import org.apache.gearpump.streaming.task._

import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -55,7 +56,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
getCheckpointClock
}
timeoutMillis.foreach(scheduleTimeout)
self ! Message("start")
self ! Watermark(Instant.now)
}

override def onNext(msg: Message): Unit = {
Expand All @@ -68,7 +69,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
case _ =>
gearpumpSpout.next(msg)
}
self ! Message("continue")
self ! Watermark(Instant.now)
}

override def receiveUnManagedMessage: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.source.Watermark
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.{Matchers, WordSpec}
Expand All @@ -46,7 +47,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
stormProducer.onStart(startTime)

verify(gearpumpSpout).start(startTime)
taskActor.expectMsg(Message("start"))
taskActor.expectMsgType[Watermark]
}

"pass message to GearpumpBolt onNext" in {
Expand All @@ -64,7 +65,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
stormProducer.onNext(message)

verify(gearpumpSpout).next(message)
taskActor.expectMsg(Message("continue"))
taskActor.expectMsgType[Watermark]

stormProducer.onNext(StormProducer.TIMEOUT)
verify(gearpumpSpout).timeout(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import kafka.api.OffsetRequest;
import kafka.common.TopicAndPartition;
import kafka.consumer.ConsumerConfig;
import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder;
import org.apache.gearpump.streaming.kafka.lib.source.DefaultKafkaMessageDecoder;
import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
import org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper;
import org.apache.gearpump.streaming.source.DefaultTimeStampFilter;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

Expand Down Expand Up @@ -87,10 +86,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
private static final String MESSAGE_DECODER_CLASS_DOC =
"Message decoder class that implements the <code>MessageDecoder</code> interface.";

public static final String TIMESTAMP_FILTER_CLASS_CONFIG = "timestamp.filter.class";
private static final String TIMESTAMP_FILTER_CLASS_DOC =
"Timestamp filter class that implements the <code>TimeStampFilter</code> interface";

public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
private static final String PARTITION_GROUPER_CLASS_DOC =
"Partition grouper class that implements the <code>KafkaGrouper</code> interface.";
Expand Down Expand Up @@ -119,8 +114,9 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
"",
ConfigDef.Importance.HIGH,
GROUP_ID_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG, // required with no default value
.define(ZOOKEEPER_CONNECT_CONFIG,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.HIGH,
ZOOKEEPER_CONNECT_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Expand All @@ -131,14 +127,9 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
REPLICATION_FACTOR_DOC)
.define(MESSAGE_DECODER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DefaultMessageDecoder.class.getName(),
DefaultKafkaMessageDecoder.class.getName(),
ConfigDef.Importance.MEDIUM,
MESSAGE_DECODER_CLASS_DOC)
.define(TIMESTAMP_FILTER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DefaultTimeStampFilter.class.getName(),
ConfigDef.Importance.MEDIUM,
TIMESTAMP_FILTER_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DefaultPartitionGrouper.class.getName(),
Expand Down Expand Up @@ -228,7 +219,6 @@ private void removeSourceSpecificConfigs(Properties props) {
props.remove(FETCH_THRESHOLD_CONFIG);
props.remove(PARTITION_GROUPER_CLASS_CONFIG);
props.remove(MESSAGE_DECODER_CLASS_CONFIG);
props.remove(TIMESTAMP_FILTER_CLASS_CONFIG);
props.remove(REPLICATION_FACTOR_CONFIG);
props.remove(CHECKPOINT_STORE_NAME_PREFIX_CONFIG);
}
Expand Down
Loading

0 comments on commit 529799c

Please sign in to comment.