From 074ec58a9bf2bd754bb17bd17aac1552a352fa7d Mon Sep 17 00:00:00 2001
From: Kam Kasravi <kamkasravi@yahoo.com>
Date: Wed, 23 Sep 2015 05:53:00 -0700
Subject: [PATCH] Fixes #13 Add an ATK example

---
 .../examples/atk_pipeline/ATKTask.scala       | 27 ++++++++
 .../examples/atk_pipeline/KMeans.scala        |  8 +++
 .../examples/atk_pipeline/PipeLine.scala      | 64 +++++++++++++++++
 .../examples/atk_pipeline/Scoring.scala       | 27 ++++++++
 .../kafka_hbase_pipeline/PipeLine.scala       |  6 +-
 .../kafka_hdfs_pipeline/PipeLine.scala        |  6 +-
 project/Build.scala                           | 68 ++++++++++++++++++-
 .../examples/tap_pipeline/PipeLine.scala      | 13 ++--
 8 files changed, 208 insertions(+), 11 deletions(-)
 create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala
 create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala
 create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala
 create mode 100644 atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala

diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala
new file mode 100644
index 0000000..6f1d979
--- /dev/null
+++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/ATKTask.scala
@@ -0,0 +1,27 @@
+package io.gearpump.examples.atk_pipeline
+
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask
+import io.gearpump.streaming.task.{StartTime, TaskContext}
+
+class ATKTask(taskContext: TaskContext, userConf: UserConfig) extends HandlerTask[Scoring](taskContext, userConf) {
+  val TAR = "trustedanalytics.scoring-engine.archive-tar"
+
+  override def onStart(startTime : StartTime) : Unit = {
+    LOG.info("onStart")
+    val tar = "foo"//userConf.getString(TAR)
+    handler.load(tar)
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    LOG.info("onNext")
+    handler.score(Array("foo"))
+    //handler.score(read[Array[String]](msg.msg.asInstanceOf[String]))
+  }
+
+}
+
+object ATKTask {
+  implicit val atkTask = classOf[ATKTask]
+}
diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala
new file mode 100644
index 0000000..e0ca54f
--- /dev/null
+++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/KMeans.scala
@@ -0,0 +1,8 @@
+package io.gearpump.examples.atk_pipeline
+
+class KMeans extends Scoring {
+
+  override def score(vector: Array[String]): Unit = {
+  }
+
+}
diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala
new file mode 100644
index 0000000..46e4478
--- /dev/null
+++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/PipeLine.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.examples.atk_pipeline
+
+import akka.actor.ActorSystem
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import io.gearpump.streaming.dsl.CollectionDataSource
+import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
+import io.gearpump.streaming.source.DataSource
+import io.gearpump.streaming.{Processor, StreamApplication}
+import io.gearpump.util.Graph._
+import io.gearpump.util.{AkkaApp, Graph, LogUtil}
+import org.slf4j.Logger
+
+
+object PipeLine extends AkkaApp with ArgumentsParser {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "models"-> CLIOption[String]("<models found in hdfs>", required = false, defaultValue = Some("/user/gearpump/atk/kmeans.tar")),
+    "randomforest"-> CLIOption[String]("<tar file location in hdfs>", required = false, defaultValue = Some("/user/gearpump/atk/randomforest.tar"))
+  )
+
+  def application(config: ParseResult, system: ActorSystem): StreamApplication = {
+    import ATKTask._
+    import SourceTask._
+    implicit val actorSystem = system
+    val TAR = "trustedanalytics.scoring-engine.archive-tar"
+    val appConfig = UserConfig.empty.withString(TAR, config.getString("tar"))
+    val sourceProcessor = Processor[HandlerTask,DataSource](new CollectionDataSource[String](Seq("one","two","three")), 1, "Source", UserConfig.empty)
+    val kmeansProcessor = Processor[HandlerTask,Scoring](new KMeans, 1, "ATK", appConfig)
+    val app = StreamApplication("ATKPipeline", Graph(
+      sourceProcessor ~> kmeansProcessor
+    ), UserConfig.empty)
+    app
+  }
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val config = parse(args)
+    val context = ClientContext(akkaConf)
+    val appId = context.submit(application(config, context.system))
+    context.close()
+  }
+
+}
+
diff --git a/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala
new file mode 100644
index 0000000..f2ce2d7
--- /dev/null
+++ b/atk-pipeline/src/main/scala/io/gearpump/examples/atk_pipeline/Scoring.scala
@@ -0,0 +1,27 @@
+package io.gearpump.examples.atk_pipeline
+
+import java.io.File
+import java.net.URI
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.trustedanalytics.atk.model.publish.format.ModelPublishFormat
+import org.trustedanalytics.atk.scoring.interfaces.Model
+
+trait Scoring extends java.io.Serializable {
+
+  var model: Option[Model] = None
+
+  def load(tar: String): Unit = {
+    val pt = new Path(tar)
+    val uri = new URI(tar)
+    val hdfsFileSystem: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.FileSystem.get(uri, new Configuration())
+    val tempFilePath = "/tmp/kmeans.tar"
+    val local = new Path(tempFilePath)
+    hdfsFileSystem.copyToLocalFile(false, pt, local)
+    model = Option(ModelPublishFormat.read(new File(tempFilePath), Thread.currentThread().getContextClassLoader))
+  }
+
+  def score(vector: Array[String]): Unit = {}
+
+}
diff --git a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala
index daef567..e07526a 100644
--- a/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala
+++ b/kafka-hbase-pipeline/src/main/scala/io/gearpump/examples/kafka_hbase_pipeline/PipeLine.scala
@@ -23,8 +23,9 @@ import com.typesafe.config.ConfigFactory
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
+import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
 import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
-import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.source.DataSource
 import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
@@ -44,6 +45,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
   )
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication = {
+    import SourceTask._
     implicit val actorSystem = system
     import Messages._
     val pipelineString =
@@ -75,7 +77,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
 
     val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
     val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
-    val kafka = DataSourceProcessor(source, 1)
+    val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty)
     val cpuProcessor = Processor[CpuProcessor](processors, "CpuProcessor")
     val memoryProcessor = Processor[MemoryProcessor](processors, "MemoryProcessor")
     val cpuPersistor = Processor[CpuPersistor](persistors, "CpuPersistor")
diff --git a/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala b/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala
index 7de9a88..49ed013 100644
--- a/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala
+++ b/kafka-hdfs-pipeline/src/main/scala/io/gearpump/examples/kafka_hdfs_pipeline/PipeLine.scala
@@ -23,8 +23,9 @@ import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.partitioner.ShufflePartitioner
+import io.gearpump.streaming.dsl.plan.OpTranslator.{HandlerTask, SourceTask}
 import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
-import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.source.DataSource
 import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
@@ -54,6 +55,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
   )
 
   def application(config: ParseResult, system: ActorSystem): StreamApplication = {
+    import SourceTask._
     implicit val actorSystem = system
     val readerNum = config.getInt("reader")
     val scorerNum = config.getInt("scorer")
@@ -67,7 +69,7 @@ object PipeLine extends AkkaApp with ArgumentsParser {
 
     val partitioner = new ShufflePartitioner()
     val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
-    val reader = DataSourceProcessor(source, readerNum)
+    val reader = Processor[HandlerTask,DataSource](source, readerNum, "KafkaSource", UserConfig.empty)
     val scorer = Processor[ScoringTask](scorerNum)
     val writer = Processor[ParquetWriterTask](writerNum)
 
diff --git a/project/Build.scala b/project/Build.scala
index 8a4d6f8..17fb76e 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -21,6 +21,7 @@ object Build extends sbt.Build {
   val travis_deploy = taskKey[Unit]("use this after sbt assembly packArchive, it will rename the package so that travis deploy can find the package.")
   
   val akkaVersion = "2.3.12"
+  val atkVersion = "0.4.3-master-SNAPSHOT"
   val clouderaVersion = "2.6.0-cdh5.4.2"
   val clouderaHBaseVersion = "1.0.0-cdh5.4.2"
   val gearpumpVersion = "0.6.2-SNAPSHOT"
@@ -48,7 +49,8 @@ object Build extends sbt.Build {
           "bintray/non" at "http://dl.bintray.com/non/maven",
           "cloudera" at "https://repository.cloudera.com/artifactory/cloudera-repos",
           "clockfly" at "http://dl.bintray.com/clockfly/maven",
-          "local maven" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
+          "tap" at "https://maven.trustedanalytics.org/content/repositories/snapshots",
+          "localmaven" at "file://"+Path.userHome.absolutePath+"/.m2/repository"
         ),
         addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.0-M5" cross CrossVersion.full)
     ) ++
@@ -133,7 +135,7 @@ object Build extends sbt.Build {
           new File(packagePath).renameTo(new File(target))
         }
       )
-  ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, tap_pipeline)
+  ).aggregate(kafka_hdfs_pipeline, kafka_hbase_pipeline, atk_pipeline, tap_pipeline)
 
   lazy val kafka_hdfs_pipeline = Project(
     id = "gearpump-kafka-hdfs-pipeline",
@@ -268,6 +270,68 @@ object Build extends sbt.Build {
       )
   )
 
+  lazy val atk_pipeline = Project(
+    id = "gearpump-atk-pipeline",
+    base = file("atk-pipeline"),
+    settings = commonSettings ++ myAssemblySettings ++
+      Seq(
+        mergeStrategy in assembly := {
+          case PathList("META-INF", "maven","org.slf4j","slf4j-api", ps) if ps.startsWith("pom") => MergeStrategy.discard
+          case x =>
+            val oldStrategy = (mergeStrategy in assembly).value
+            oldStrategy(x)
+        },
+        libraryDependencies ++= Seq(
+          "com.lihaoyi" %% "upickle" % upickleVersion,
+          "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "provided"
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
+          "com.github.intel-hadoop" %% "gearpump-core" % gearpumpVersion % "test" classifier "tests",
+          "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "provided"
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
+          "com.github.intel-hadoop" %% "gearpump-streaming" % gearpumpVersion % "test" classifier "tests",
+          "com.github.intel-hadoop" %% "gearpump-external-kafka" % gearpumpVersion
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all"),
+          "org.trustedanalytics.atk" % "model-publish-format" % atkVersion,
+          "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2",
+          "com.julianpeeters" % "avro-scala-macro-annotations_2.11" % "0.9.0",
+          "org.apache.hadoop" % "hadoop-hdfs" % clouderaVersion
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
+            exclude("org.mortbay.jetty", "jetty-util")
+            exclude("org.mortbay.jetty", "jetty")
+            exclude("org.apache.htrace", "htrace-core")
+            exclude("tomcat", "jasper-runtime"),
+          "org.apache.hadoop" % "hadoop-yarn-api" % clouderaVersion
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
+            exclude("com.google.guava", "guava")
+            exclude("com.google.protobuf", "protobuf-java")
+            exclude("commons-lang", "commons-lang")
+            exclude("org.apache.htrace", "htrace-core")
+            exclude("commons-logging", "commons-logging")
+            exclude("org.apache.hadoop", "hadoop-annotations"),
+          "org.apache.hadoop" % "hadoop-yarn-client" % clouderaVersion
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
+            exclude("com.google.guava", "guava")
+            exclude("com.sun.jersey", "jersey-client")
+            exclude("commons-cli", "commons-cli")
+            exclude("commons-lang", "commons-lang")
+            exclude("commons-logging", "commons-logging")
+            exclude("org.apache.htrace", "htrace-core")
+            exclude("log4j", "log4j")
+            exclude("org.apache.hadoop", "hadoop-annotations")
+            exclude("org.mortbay.jetty", "jetty-util")
+            exclude("org.apache.hadoop", "hadoop-yarn-api")
+            exclude("org.apache.hadoop", "hadoop-yarn-common"),
+          "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
+          "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
+          "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
+          "org.mockito" % "mockito-core" % mockitoVersion % "test",
+          "junit" % "junit" % junitVersion % "test"
+        ) ++ hadoopDependency,
+        mainClass in (Compile, packageBin) := Some("io.gearpump.examples.atk_pipeline.PipeLine"),
+        target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor
+      )
+  )
+
   lazy val tap_pipeline = Project(
     id = "gearpump-tap-pipeline",
     base = file("tap-pipeline"),
diff --git a/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala
index 74610db..5b7d93f 100644
--- a/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala
+++ b/tap-pipeline/src/main/scala/io/gearpump/examples/tap_pipeline/PipeLine.scala
@@ -24,10 +24,11 @@ import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, ParseResult}
 import io.gearpump.external.hbase.HBaseSink
-import io.gearpump.streaming.StreamApplication
+import io.gearpump.streaming.dsl.plan.OpTranslator.HandlerTask
 import io.gearpump.streaming.kafka.{KafkaSource, KafkaStorageFactory}
-import io.gearpump.streaming.sink.DataSinkProcessor
-import io.gearpump.streaming.source.DataSourceProcessor
+import io.gearpump.streaming.sink.DataSink
+import io.gearpump.streaming.source.DataSource
+import io.gearpump.streaming.{Processor, StreamApplication}
 import io.gearpump.tap.TapJsonConfig
 import io.gearpump.util.Graph._
 import io.gearpump.util.{AkkaApp, Graph, LogUtil}
@@ -59,8 +60,10 @@ object PipeLine extends AkkaApp with ArgumentsParser {
     val table = config.getString("table")
     val zookeepers = kafkaconfig.get("zookeepers").get
     val brokers = kafkaconfig.get("brokers").get
-    val source = DataSourceProcessor(new KafkaSource(topic, zookeepers,new KafkaStorageFactory(zookeepers, brokers)), 1)
-    val sink = DataSinkProcessor(new HBaseSink(table, hbaseconfig), 1)
+    val offsetStorageFactory = new KafkaStorageFactory(zookeepers, brokers)
+    val source = new KafkaSource(topic, zookeepers, offsetStorageFactory)
+    val kafka = Processor[HandlerTask,DataSource](source, 1, "KafkaSource", UserConfig.empty)
+    val sink = Processor[HandlerTask,DataSink](new HBaseSink(table, hbaseconfig), 1, "HBaseSink", UserConfig.empty)
     val app = StreamApplication("TAPPipeline", Graph(
       source ~> sink
     ), UserConfig.empty)