diff --git a/cluster/src/main/java/org/apache/spark/sql/JavaSnappySQLJob.java b/cluster/src/main/java/org/apache/spark/sql/JavaSnappySQLJob.java deleted file mode 100644 index 98b83fb62a..0000000000 --- a/cluster/src/main/java/org/apache/spark/sql/JavaSnappySQLJob.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (c) 2016 SnappyData, Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you - * may not use this file except in compliance with the License. You - * may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. See accompanying - * LICENSE file. - */ -package org.apache.spark.sql; - -import com.typesafe.config.Config; -import spark.jobserver.SparkJobValidation; - -/** - * This class acts as a helper class to implement Java jobs in Spark job server with - * SnappyContext implicitly provided. - */ -public abstract class JavaSnappySQLJob implements SnappySQLJob { - - abstract public Object runJavaJob(SnappyContext snc, Config jobConfig); - - abstract public JSparkJobValidation isValidJob(SnappyContext snc, Config jobConfig); - - @Override - public Object runJob(Object sc, Config jobConfig) { - return runJavaJob((SnappyContext)sc, jobConfig); - } - - @Override - public SparkJobValidation validate(Object sc, Config config) { - JSparkJobValidation status = isValidJob((SnappyContext)sc, config); - return JavaJobValidate.validate(status); - } -} diff --git a/cluster/src/main/java/org/apache/spark/streaming/JavaSnappyStreamingJob.java b/cluster/src/main/java/org/apache/spark/streaming/JavaSnappyStreamingJob.java index 9f832dea65..3d5f0127ef 100644 --- a/cluster/src/main/java/org/apache/spark/streaming/JavaSnappyStreamingJob.java +++ b/cluster/src/main/java/org/apache/spark/streaming/JavaSnappyStreamingJob.java @@ -18,28 +18,29 @@ import com.typesafe.config.Config; -import org.apache.spark.sql.JSparkJobValidation; -import org.apache.spark.sql.JavaJobValidate; -import org.apache.spark.sql.streaming.SnappyStreamingJob; +import org.apache.spark.sql.SnappyJobValidate; +import org.apache.spark.sql.SnappyJobValidation; import org.apache.spark.streaming.api.java.JavaSnappyStreamingContext; + +import spark.jobserver.SparkJobBase; import spark.jobserver.SparkJobValidation; +import org.apache.spark.util.Utils; -public abstract class JavaSnappyStreamingJob implements SnappyStreamingJob { +public abstract class JavaSnappyStreamingJob implements SparkJobBase { - abstract public Object runJavaJob(JavaSnappyStreamingContext snc, Config jobConfig); + abstract public Object runSnappyJob(JavaSnappyStreamingContext snc, Config jobConfig); - abstract public JSparkJobValidation isValidJob(JavaSnappyStreamingContext snc, + abstract public SnappyJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig); @Override - public Object runJob(Object sc, Config jobConfig) { - return runJavaJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), jobConfig); + final public SparkJobValidation validate(Object sc, Config config) { + return SnappyJobValidate.validate(isValidJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), config)); } @Override - public SparkJobValidation validate(Object sc, Config config) { - JSparkJobValidation status = - isValidJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), config); - return JavaJobValidate.validate(status); + final public Object runJob(Object sc, Config jobConfig) { + return runSnappyJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), jobConfig); } -} + +} \ No newline at end of file diff --git a/cluster/src/main/scala/org/apache/spark/sql/SnappyContextFactory.scala b/cluster/src/main/scala/org/apache/spark/sql/SnappyContextFactory.scala index ca792fb97a..5b02a82474 100644 --- a/cluster/src/main/scala/org/apache/spark/sql/SnappyContextFactory.scala +++ b/cluster/src/main/scala/org/apache/spark/sql/SnappyContextFactory.scala @@ -19,28 +19,10 @@ package org.apache.spark.sql import com.typesafe.config.Config import io.snappydata.impl.LeadImpl import spark.jobserver.context.SparkContextFactory -import spark.jobserver.{SparkJobValid, SparkJobInvalid, SparkJobValidation, SparkJob, ContextLike, SparkJobBase} +import spark.jobserver.{ContextLike, SparkJobBase, SparkJobInvalid, SparkJobValid, SparkJobValidation} -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.streaming.api.java.JavaDStream -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkConf -trait SnappySQLJob extends SparkJobBase { - type C = SnappyContext -} - -object JavaJobValidate{ - def validate(status :JSparkJobValidation) : SparkJobValidation ={ - status match { - case j : JSparkJobValid => SparkJobValid - case j : JSparkJobInvalid => SparkJobInvalid(j.reason) - case _ => SparkJobInvalid("isValid method is not correct") - } - } -} -trait JSparkJobValidation -case class JSparkJobValid extends JSparkJobValidation -case class JSparkJobInvalid(reason : String) extends JSparkJobValidation class SnappyContextFactory extends SparkContextFactory { @@ -61,9 +43,42 @@ object SnappyContextFactory { new SnappyContext(snappyContextLike.sparkContext, snappyContextLike.listener, false) with ContextLike { - override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappySQLJob] - override def stop(): Unit = { - // not stopping anything here because SQLContext doesn't have one. + override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappySQLJob] + override def stop(): Unit = { + // not stopping anything here because SQLContext doesn't have one. + } + } +} + + + +abstract class SnappySQLJob extends SparkJobBase { + type C = Any + + final override def validate(sc: C, config: Config): SparkJobValidation = { + SnappyJobValidate.validate(isValidJob(sc.asInstanceOf[SnappyContext], config)) + } + + final override def runJob(sc: C, jobConfig: Config): Any = { + runSnappyJob(sc.asInstanceOf[SnappyContext], jobConfig) + } + + def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation + + def runSnappyJob(sc: SnappyContext, jobConfig: Config): Any; + +} + +object SnappyJobValidate { + def validate(status: SnappyJobValidation): SparkJobValidation = { + status match { + case j: SnappyJobValid => SparkJobValid + case j: SnappyJobInvalid => SparkJobInvalid(j.reason) + case _ => SparkJobInvalid("isValid method is not correct") } } } + +trait SnappyJobValidation +case class SnappyJobValid() extends SnappyJobValidation +case class SnappyJobInvalid(reason : String) extends SnappyJobValidation \ No newline at end of file diff --git a/cluster/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingContextFactory.scala b/cluster/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingContextFactory.scala index c4c0bdea9d..1ecae52a7a 100644 --- a/cluster/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingContextFactory.scala +++ b/cluster/src/main/scala/org/apache/spark/sql/streaming/SnappyStreamingContextFactory.scala @@ -19,13 +19,26 @@ package org.apache.spark.sql.streaming import com.typesafe.config.{ConfigException, Config} import io.snappydata.impl.LeadImpl import spark.jobserver.context.SparkContextFactory -import spark.jobserver.{ContextLike, SparkJobBase} +import spark.jobserver.{SparkJobValidation, ContextLike, SparkJobBase} import org.apache.spark.SparkConf -import org.apache.spark.streaming.{Milliseconds, SnappyStreamingContext} +import org.apache.spark.sql.{SnappyJobValidation, SnappyJobValidate} +import org.apache.spark.streaming.{JavaSnappyStreamingJob, Milliseconds, SnappyStreamingContext} + +abstract class SnappyStreamingJob extends SparkJobBase { + override type C = SnappyStreamingContext + final override def validate(sc: C, config: Config): SparkJobValidation = { + SnappyJobValidate.validate(isValidJob(sc.asInstanceOf[SnappyStreamingContext], config)) + } + + final override def runJob(sc: C, jobConfig: Config): Any = { + runSnappyJob(sc.asInstanceOf[SnappyStreamingContext], jobConfig) + } + + def isValidJob(sc: SnappyStreamingContext, config: Config): SnappyJobValidation + + def runSnappyJob(sc: SnappyStreamingContext, jobConfig: Config): Any; -trait SnappyStreamingJob extends SparkJobBase { - type C = SnappyStreamingContext } class SnappyStreamingContextFactory extends SparkContextFactory { @@ -38,7 +51,8 @@ class SnappyStreamingContextFactory extends SparkContextFactory { new SnappyStreamingContext(LeadImpl.getInitializingSparkContext, Milliseconds(interval)) with ContextLike { - override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappyStreamingJob] + override def isValidJob(job: SparkJobBase): Boolean = + job.isInstanceOf[SnappyStreamingJob] || job.isInstanceOf[JavaSnappyStreamingJob] override def stop(): Unit = { try { @@ -50,4 +64,4 @@ class SnappyStreamingContextFactory extends SparkContextFactory { } } } -} +} \ No newline at end of file diff --git a/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Query.scala b/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Query.scala index 33b6b974e0..68cd3de993 100644 --- a/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Query.scala +++ b/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Query.scala @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.benchmark.snappy import scala.collection.JavaConverters._ @@ -6,12 +23,9 @@ import scala.language.implicitConversions import com.typesafe.config.Config import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation} -import org.apache.spark.sql.{SnappyContext, SnappySQLJob} +import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappyContext, SnappySQLJob} import org.apache.spark.{SparkConf, SparkContext} -/** - * Created by kishor on 28/1/16. - */ object TPCH_Snappy_Query extends SnappySQLJob{ var sqlSparkProperties: Array[String] = _ @@ -23,7 +37,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{ var runsForAverage: Integer = _ - override def runJob(snc: C, jobConfig: Config): Any = { + override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = { jobConfig.entrySet().asScala.foreach(entry => if (entry.getKey.startsWith("spark.sql.")) { val entryString = entry.getKey + "=" + jobConfig.getString(entry.getKey) @@ -89,7 +103,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{ runJob(snc, null) } - override def validate(sc: C, config: Config): SparkJobValidation = { + override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = { var sqlSparkProps = if (config.hasPath("sparkSqlProps")) { config.getString("sparkSqlProps") @@ -101,7 +115,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{ var tempqueries = if (config.hasPath("queries")) { config.getString("queries") } else { - return new SparkJobInvalid("Specify Query number to be executed") + return new SnappyJobInvalid("Specify Query number to be executed") } println(s"tempqueries : $tempqueries") @@ -110,25 +124,25 @@ object TPCH_Snappy_Query extends SnappySQLJob{ useIndex = if (config.hasPath("useIndex")) { config.getBoolean("useIndex") } else { - return new SparkJobInvalid("Specify whether to use Index") + return new SnappyJobInvalid("Specify whether to use Index") } isResultCollection = if (config.hasPath("resultCollection")) { config.getBoolean("resultCollection") } else { - return new SparkJobInvalid("Specify whether to to collect results") + return new SnappyJobInvalid("Specify whether to to collect results") } warmUp = if (config.hasPath("warmUpIterations")) { config.getInt("warmUpIterations") } else { - return new SparkJobInvalid("Specify number of warmup iterations ") + return new SnappyJobInvalid("Specify number of warmup iterations ") } runsForAverage = if (config.hasPath("actualRuns")) { config.getInt("actualRuns") } else { - return new SparkJobInvalid("Specify number of iterations of which average result is calculated") + return new SnappyJobInvalid("Specify number of iterations of which average result is calculated") } - SparkJobValid + new SnappyJobValid() } } diff --git a/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Tables.scala b/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Tables.scala index 44e7f12b8b..85ac685956 100644 --- a/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Tables.scala +++ b/cluster/src/test/scala/io/snappydata/benchmark/snappy/TPCH_Snappy_Tables.scala @@ -22,7 +22,7 @@ import com.typesafe.config.Config import io.snappydata.benchmark.{TPCHColumnPartitionedTable, TPCHReplicatedTable} import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation} -import org.apache.spark.sql.SnappySQLJob +import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappyContext, SnappySQLJob} object TPCH_Snappy_Tables extends SnappySQLJob{ @@ -33,7 +33,7 @@ object TPCH_Snappy_Tables extends SnappySQLJob{ var useIndex: Boolean = _ var nation_Region_Supp_col = false - override def runJob(snc: C, jobConfig: Config): Any = { + override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = { val props : Map[String, String] = null val isSnappy = true @@ -63,7 +63,7 @@ object TPCH_Snappy_Tables extends SnappySQLJob{ } } - override def validate(sc: C, config: Config): SparkJobValidation = { + override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = { tpchDataPath = if (config.hasPath("dataLocation")) { config.getString("dataLocation") @@ -97,15 +97,15 @@ object TPCH_Snappy_Tables extends SnappySQLJob{ } if (!(new File(tpchDataPath)).exists()) { - return new SparkJobInvalid("Incorrect tpch data path. " + + return new SnappyJobInvalid("Incorrect tpch data path. " + "Specify correct location") } useIndex = if (config.hasPath("useIndex")) { config.getBoolean("useIndex") } else { - return new SparkJobInvalid("Specify whether to use Index") + return new SnappyJobInvalid("Specify whether to use Index") } - SparkJobValid + SnappyJobValid() } } diff --git a/cluster/src/test/scala/io/snappydata/filodb/FiloDb_SnappyJob.scala b/cluster/src/test/scala/io/snappydata/filodb/FiloDb_SnappyJob.scala index 7a6e6d7cfc..8ec62bb32a 100644 --- a/cluster/src/test/scala/io/snappydata/filodb/FiloDb_SnappyJob.scala +++ b/cluster/src/test/scala/io/snappydata/filodb/FiloDb_SnappyJob.scala @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.filodb import scala.concurrent.duration.Duration @@ -6,13 +23,10 @@ import scala.util.Random import scala.concurrent.ExecutionContext.Implicits.global import com.typesafe.config.Config -import spark.jobserver.{SparkJobValid, SparkJobValidation} -import org.apache.spark.sql.{DataFrame, SaveMode, SnappyContext, SnappySQLJob} +import org.apache.spark.sql.{SnappyJobValid, SnappyJobValidation, DataFrame, SaveMode, SnappyContext, SnappySQLJob} + -/** - * Created by kishor on 30/3/16. - */ object FiloDb_SnappyJob extends SnappySQLJob { var nycTaxiDataPath: String = _ @@ -20,7 +34,7 @@ object FiloDb_SnappyJob extends SnappySQLJob { val cachedDF = new collection.mutable.HashMap[String, DataFrame] - override def runJob(sc: SnappyContext, jobConfig: Config): Any = { + override def runSnappyJob(sc: SnappyContext, jobConfig: Config): Any = { val taxiCsvFile: String = nycTaxiDataPath val numRuns = 50 // Make this higher when doing performance profiling @@ -43,7 +57,8 @@ object FiloDb_SnappyJob extends SnappySQLJob { // trip info for a single driver within a given time range val singleDriverQueries = (1 to 20).map { i => val medallion = medallions(Random.nextInt(medallions.size)) - s"SELECT avg(trip_distance), avg(passenger_count) from nyctaxi where medallion = '$medallion'" + + s"SELECT avg(trip_distance), avg(passenger_count) " + + s"from nyctaxi where medallion = '$medallion'" + s" AND pickup_datetime > '2013-01-15T00Z' AND pickup_datetime < '2013-01-22T00Z'" } @@ -59,7 +74,7 @@ object FiloDb_SnappyJob extends SnappySQLJob { csvDF.printSchema() - val p1 = Map(("PARTITION_BY" -> "medallion") /*,("BUCKETS"-> "5")*/) + val p1 = Map(("PARTITION_BY" -> "medallion") /* ,("BUCKETS"-> "5") */) sc.createTable("NYCTAXI", "column", csvDF.schema, p1) csvDF.write.format("column").mode(SaveMode.Append).options(p1).saveAsTable("NYCTAXI") puts("Ingestion done.") @@ -81,7 +96,8 @@ object FiloDb_SnappyJob extends SnappySQLJob { def runQueries(queries: Array[String], numQueries: Int = 1000): Unit = { val startMillis = System.currentTimeMillis - val futures = (0 until numQueries).map(i => getCachedDF(queries(Random.nextInt(queries.size))).rdd.collectAsync) + val futures = (0 until numQueries).map( + i => getCachedDF(queries(Random.nextInt(queries.size))).rdd.collectAsync) val fut = Future.sequence(futures.asInstanceOf[Seq[Future[Array[_]]]]) Await.result(fut, Duration.Inf) val endMillis = System.currentTimeMillis @@ -101,21 +117,21 @@ object FiloDb_SnappyJob extends SnappySQLJob { } - override def validate(sc: SnappyContext, config: Config): SparkJobValidation = { + override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = { nycTaxiDataPath = if (config.hasPath("dataLocation")) { config.getString("dataLocation") } else { "/QASNAPPY/TPCH/DATA/1" } - var sqlSparkProps = if (config.hasPath("sparkSqlProps")) { + val sqlSparkProps = if (config.hasPath("sparkSqlProps")) { config.getString("sparkSqlProps") } else " " sqlSparkProperties = sqlSparkProps.split(" ") - SparkJobValid + SnappyJobValid() } def puts(s: String): Unit = { diff --git a/docs/jobs.md b/docs/jobs.md index a96e1bfe9f..327c53620f 100644 --- a/docs/jobs.md +++ b/docs/jobs.md @@ -250,21 +250,21 @@ To create a job that can be submitted through the job server, the job must imple ```scala class SnappySampleJob implements SnappySQLJob { /** Snappy uses this as an entry point to execute Snappy jobs. **/ - def runJob(sc: SnappyContext, jobConfig: Config): Any + def runSnappyJob(sc: SnappyContext, jobConfig: Config): Any /** SnappyData calls this function to validate the job input and reject invalid job requests **/ - def validate(sc: SnappyContext, config: Config): SparkJobValidation + def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation } ``` ###### Java ```java -class SnappySampleJob extends JavaSnappySQLJob { +class SnappySampleJob extends SnappySQLJob { /** Snappy uses this as an entry point to execute Snappy jobs. **/ - public Object runJavaJob(SnappyContext snc, Config jobConfig) {//Implementation} + public Object runSnappyJob(SnappyContext snc, Config jobConfig) {//Implementation} /** SnappyData calls this function to validate the job input and reject invalid job requests **/ - public JSparkJobValidation isValidJob(SnappyContext snc, Config config) {//validate} + public SnappyJobValidation isValidJob(SnappyContext snc, Config config) {//validate} } ``` @@ -273,10 +273,10 @@ class SnappySampleJob extends JavaSnappySQLJob { ```scala class SnappyStreamingSampleJob implements SnappyStreamingJob { /** Snappy uses this as an entry point to execute Snappy jobs. **/ - def runJob(sc: SnappyStreamingContext, jobConfig: Config): Any + def runSnappyJob(sc: SnappyStreamingContext, jobConfig: Config): Any /** SnappyData calls this function to validate the job input and reject invalid job requests **/ - def validate(sc: SnappyContext, config: Config): SparkJobValidation + def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation } ``` @@ -284,24 +284,22 @@ class SnappyStreamingSampleJob implements SnappyStreamingJob { ```java class SnappyStreamingSampleJob extends JavaSnappyStreamingJob { /** Snappy uses this as an entry point to execute Snappy jobs. **/ - public Object runJavaJob(JavaSnappyStreamingContext snsc, Config jobConfig) {//implementation } + public Object runSnappyJob(JavaSnappyStreamingContext snsc, Config jobConfig) {//implementation } /** SnappyData calls this function to validate the job input and reject invalid job requests **/ - public JSparkJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig) + public SnappyJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig) {//validate} } ``` > The _Job_ traits are simply extensions of the _SparkJob_ implemented by [Spark JobServer](https://github.com/spark-jobserver/spark-jobserver). -• ```runJob```/```runJavaJob``` contains the implementation of the Job. +• ```runSnappyJob``` contains the implementation of the Job. The [SnappyContext](http://snappydatainc.github.io/snappydata/apidocs/#org.apache.spark.sql.SnappyContext)/[SnappyStreamingContext](http://snappydatainc.github.io/snappydata/apidocs/#org.apache.spark.sql.streaming.SnappyStreamingContext) is managed by the SnappyData Leader (which runs an instance of Spark JobServer) and will be provided to the job through this method. This relieves the developer from the boiler-plate configuration management that comes with the creation of a Spark job and allows the Job Server to manage and re-use contexts. -• ```validate```/```isValidJob``` allows for an initial validation of the context and any provided configuration. - If the context and configuration are OK to run the job, returning spark.jobserver.SparkJobValid - (org.apache.spark.sql.JSparkJobValid for Java) - will let the job execute, otherwise returning spark.jobserver.SparkJobInvalid(reason) - (org.apache.spark.sql.JSparkJobInvalid for Java) prevents +• ```isValidJob``` allows for an initial validation of the context and any provided configuration. + If the context and configuration are OK to run the job, returning spark.jobserver.SnappyJobValid + will let the job execute, otherwise returning spark.jobserver.SnappyJobInvalid(reason) prevents the job from running and provides means to convey the reason of failure. In this case, the call immediately returns an HTTP/1.1 400 Bad Request status code.
validate helps you preventing running jobs that will eventually fail due to missing or wrong configuration and save both time and resources. See [examples](https://github.com/SnappyDataInc/snappydata/tree/master/snappy-examples/src/main/scala/io/snappydata/examples) for Spark and spark streaming jobs. diff --git a/dtests/src/test/scala/io/snappydata/hydra/AirlineDataQueriesJob.scala b/dtests/src/test/scala/io/snappydata/hydra/AirlineDataQueriesJob.scala index f4328b209d..4162551dd0 100644 --- a/dtests/src/test/scala/io/snappydata/hydra/AirlineDataQueriesJob.scala +++ b/dtests/src/test/scala/io/snappydata/hydra/AirlineDataQueriesJob.scala @@ -18,11 +18,11 @@ package io.snappydata.hydra import java.io.{File, FileOutputStream, PrintWriter} +import scala.util.{Failure, Success, Try} + import com.typesafe.config.Config -import org.apache.spark.sql.{DataFrame, SnappySQLJob} -import spark.jobserver.{SparkJobValid, SparkJobValidation} -import scala.util.{Failure, Success, Try} +import org.apache.spark.sql.{SnappyJobValid, DataFrame, SnappyContext, SnappyJobValidation, SnappySQLJob} /** * Fetches already created tables. Airline table is already persisted in @@ -36,7 +36,7 @@ import scala.util.{Failure, Success, Try} * Created by swati on 6/4/16. */ object AirlineDataQueriesJob extends SnappySQLJob { - override def runJob(snc: C, jobConfig: Config): Any = { + override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = { val colTable = "AIRLINE" val parquetTable = "STAGING_AIRLINE" val rowTable = "AIRLINEREF" @@ -69,42 +69,59 @@ object AirlineDataQueriesJob extends SnappySQLJob { } - //Method for running all olap and oltp queries and calculating total elapsed time for each query collectively at the end along with the number of times query Executed. + // Method for running all olap and oltp queries and calculating total elapsed time + // for each query collectively at the end along with the number of times query Executed. - def runQueries(pw: PrintWriter, snc: C): Unit = { - var query1ExecutionCount, query2ExecutionCount, query3ExecutionCount, query4ExecutionCount, query5ExecutionCount = 0 - var totalTimeQuery1, totalTimeQuery2, totalTimeQuery3, totalTimeQuery4, totalTimeQuery5: Long = 0 + def runQueries(pw: PrintWriter, snc: SnappyContext): Unit = { + var query1ExecutionCount, query2ExecutionCount, + query3ExecutionCount, query4ExecutionCount, query5ExecutionCount = 0 + var totalTimeQuery1, totalTimeQuery2, totalTimeQuery3, + totalTimeQuery4, totalTimeQuery5: Long = 0 val startTime = System.currentTimeMillis val EndTime: Long = startTime + 600000 while (EndTime > System.currentTimeMillis()) { // while (startTime < EndTime) { - //This Query retrives which airline had the most flights each year. - val query1: String = "select count(*) flightRecCount, description AirlineName, UniqueCarrier carrierCode ,Year_ \n from airline , airlineref\n where airline.UniqueCarrier = airlineref.code\n group by UniqueCarrier,description, Year_ \n order by flightRecCount desc limit 10 " + // This Query retrives which airline had the most flights each year. + val query1: String = "select count(*) flightRecCount, description AirlineName, UniqueCarrier carrierCode ,Year_ \n " + + "from airline , airlineref\n " + + "where airline.UniqueCarrier = airlineref.code\n " + + "group by UniqueCarrier,description, Year_ \n " + + "order by flightRecCount desc limit 10 " val query1Result = snc.sql(query1) val startTimeQuery1 = System.currentTimeMillis val result1 = query1Result.collect() totalTimeQuery1 += (System.currentTimeMillis - startTimeQuery1) query1ExecutionCount += 1 - //This query retrives which Airlines Arrive On Schedule - val query2: String = "select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier from airline \n" + " group by UniqueCarrier\n" + " order by arrivalDelay " + // This query retrives which Airlines Arrive On Schedule + val query2: String = "select AVG(ArrDelay) arrivalDelay, UniqueCarrier carrier from airline \n" + + " group by UniqueCarrier\n" + + "order by arrivalDelay " val query2Result = snc.sql(query2) val startTimeQuery2 = System.currentTimeMillis val result2 = query2Result.collect() totalTimeQuery2 += (System.currentTimeMillis - startTimeQuery2) query2ExecutionCount += 1 - //This method retrives which Airlines Arrive On Schedule. JOIN with reference table. - val query3: String = "select AVG(ArrDelay) arrivalDelay, description AirlineName, UniqueCarrier carrier \n from airline, airlineref\n where airline.UniqueCarrier = airlineref.Code \n group by UniqueCarrier, description \n order by arrivalDelay " + // This method retrives which Airlines Arrive On Schedule. JOIN with reference table. + val query3: String = "select AVG(ArrDelay) arrivalDelay, description AirlineName, UniqueCarrier carrier \n" + + "from airline, airlineref \n" + + "where airline.UniqueCarrier = airlineref.Code \n " + + "group by UniqueCarrier, description \n " + + "order by arrivalDelay " + val Query3Result = snc.sql(query3) val startTimeQuery3 = System.currentTimeMillis val result3 = Query3Result.collect() totalTimeQuery3 += (System.currentTimeMillis - startTimeQuery3) query3ExecutionCount += 1 - //This query retrives the trend in arrival delays across all airlines in the US - val query4: String = "select AVG(ArrDelay) ArrivalDelay, Year_\n from airline \n group by Year_ \n order by Year_ " + // This query retrives the trend in arrival delays across all airlines in the US + val query4: String = "select AVG(ArrDelay) ArrivalDelay, Year_\n " + + "from airline \n " + + "group by Year_ \n " + + "order by Year_ " val Query4Result = snc.sql(query4) val startTimeQuery4 = System.currentTimeMillis val result4 = Query4Result.collect() @@ -112,8 +129,12 @@ object AirlineDataQueriesJob extends SnappySQLJob { query4ExecutionCount += 1 - //This query retrives Which airline out of SanFrancisco had most delays due to weather - val query5: String = "SELECT sum(WeatherDelay) totalWeatherDelay, airlineref.DESCRIPTION \n FROM airline, airlineref \n WHERE airline.UniqueCarrier = airlineref.CODE AND Origin like '%SFO%' AND WeatherDelay > 0 \n GROUP BY DESCRIPTION \n limit 20" + // This query retrives Which airline out of SanFrancisco had most delays due to weather + val query5: String = "SELECT sum(WeatherDelay) totalWeatherDelay, airlineref.DESCRIPTION \n " + + " FROM airline, airlineref \n " + + " WHERE airline.UniqueCarrier = airlineref.CODE" + + " AND Origin like '%SFO%' AND WeatherDelay > 0 \n" + + " GROUP BY DESCRIPTION \n limit 20" val Query5Result = snc.sql(query5) val startTimeQuery5 = System.currentTimeMillis @@ -121,33 +142,29 @@ object AirlineDataQueriesJob extends SnappySQLJob { totalTimeQuery5 += (System.currentTimeMillis - startTimeQuery5) query5ExecutionCount += 1 // startTime = System.currentTimeMillis -// pw.println(s"\n****** startTime is " + startTime + " ms And endTime is " + EndTime + " ms****") -// pw.flush() + // pw.flush() } pw.println(s"\n****** countQueryWithGroupByOrderBy Execution " + - s"took ${totalTimeQuery1} ms******") + s"took ${totalTimeQuery1} ms******") pw.println(s"\n****** countQueryWithGroupByOrderBy Execution " + - s"count is :: ${query1ExecutionCount} ******") + s"count is :: ${query1ExecutionCount} ******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByForScheduleQuery Execution " + - s"took ${totalTimeQuery2} ms******") + s"took ${totalTimeQuery2} ms******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByForScheduleQuery Execution " + - s"count is :: ${query2ExecutionCount} ******") + s"count is :: ${query2ExecutionCount} ******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByWithJoinForScheduleQuery Execution " + - s"took ${totalTimeQuery3} ms******") + s"took ${totalTimeQuery3} ms******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByWithJoinForScheduleQuery Execution " + - s"count is :: ${query3ExecutionCount} ******") + s"count is :: ${query3ExecutionCount} ******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByForTrendAnalysisQuery Execution " + - s"took ${totalTimeQuery4} ms******") + s"took ${totalTimeQuery4} ms******") pw.println(s"\n****** avgArrDelayWithGroupByOrderByForTrendAnalysisQuery Execution " + - s"count is :: ${query4ExecutionCount} ******") + s"count is :: ${query4ExecutionCount} ******") pw.println(s"\n****** sumWeatherDelayWithGroupByWithLimitQuery Execution " + - s"took ${totalTimeQuery5} ms******") + s"took ${totalTimeQuery5} ms******") pw.println(s"\n****** sumWeatherDelayWithGroupByWithLimitQuery Execution " + - s"count is :: ${query5ExecutionCount} ******") - } - - override def validate(sc: C, config: Config): SparkJobValidation = { - SparkJobValid + s"count is :: ${query5ExecutionCount} ******") } + override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = SnappyJobValid() } diff --git a/dtests/src/test/scala/io/snappydata/hydra/FileStreamingJob.scala b/dtests/src/test/scala/io/snappydata/hydra/FileStreamingJob.scala index 8f5b887aee..05e6406088 100644 --- a/dtests/src/test/scala/io/snappydata/hydra/FileStreamingJob.scala +++ b/dtests/src/test/scala/io/snappydata/hydra/FileStreamingJob.scala @@ -19,11 +19,12 @@ package io.snappydata.hydra import java.io.PrintWriter import com.typesafe.config.Config -import org.apache.spark.sql.SaveMode + import org.apache.spark.sql.streaming.{SchemaDStream, SnappyStreamingJob} import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{SnappyJobValid, SaveMode, SnappyJobValidation} +import org.apache.spark.streaming.SnappyStreamingContext import org.apache.spark.streaming.dstream.DStream -import spark.jobserver.{SparkJobValid, SparkJobValidation} /** * Created by swati on 7/4/16. @@ -31,7 +32,7 @@ import spark.jobserver.{SparkJobValid, SparkJobValidation} object FileStreamingJob extends SnappyStreamingJob { - override def runJob(snsc: C, jobConfig: Config): Any = { + override def runSnappyJob(snsc: SnappyStreamingContext, jobConfig: Config): Any = { def getCurrentDirectory = new java.io.File(".").getCanonicalPath @@ -48,19 +49,19 @@ object FileStreamingJob extends SnappyStreamingJob { pw.println("##### Running example with stored tweet data #####") snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream " + - "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow'," + - "directory '" + dataDir + "/copiedtwitterdata')") + "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow'," + + "directory '" + dataDir + "/copiedtwitterdata')") snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, " + - "retweetTxt STRING) USING file_stream " + - "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow'," + - "directory '" + dataDir + "/copiedtwitterdata')") + "retweetTxt STRING) USING file_stream " + + "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow'," + + "directory '" + dataDir + "/copiedtwitterdata')") // Register continuous queries on the tables and specify window clauses val retweetStream: SchemaDStream = snsc.registerCQ("SELECT * FROM retweettable " + - "WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)") + "WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)") val tableName = "retweetStore" @@ -70,7 +71,7 @@ object FileStreamingJob extends SnappyStreamingJob { // When a tweet is retweeted multiple times, the previous entry of the tweet // is over written by the new retweet count. snsc.snappyContext.sql(s"CREATE TABLE $tableName (retweetId BIGINT, " + - s"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()") + s"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()") var totalSize: Long = 0; // Save data in snappy store @@ -95,15 +96,16 @@ object FileStreamingJob extends SnappyStreamingJob { // Query the snappystore Row table to find out the top retweets pw.println("\n Top 10 popular tweets - Query Row table \n") snsc.snappyContext.sql(s"SELECT retweetId AS RetweetId, " + - s"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ${tableName}" + - s" ORDER BY RetweetsCount DESC LIMIT 10") - .collect.foreach(row => { + s"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ${tableName}" + + s" ORDER BY RetweetsCount DESC LIMIT 10") + .collect.foreach(row => { pw.println(row.toString()) }) pw.println("\n#######################################################") - pw.println("\n Select count(*) query to get incremented counter as the time progresses - Query Row table \n") + pw.println("\n Select count(*) query to get incremented counter as " + + "the time progresses - Query Row table \n") snsc.snappyContext.sql(s"SELECT count(*) FROM ${tableName}") - .collect.foreach(row => { + .collect.foreach(row => { pw.println(row.toString()) }) Thread.sleep(2000) @@ -118,8 +120,7 @@ object FileStreamingJob extends SnappyStreamingJob { } - override def validate(snsc: C, config: Config): SparkJobValidation = { - SparkJobValid - } + override def isValidJob(sc: SnappyStreamingContext, config: Config): SnappyJobValidation = + SnappyJobValid() } diff --git a/examples/src/main/java/io/snappydata/examples/JavaCreateAndLoadAirlineDataJob.java b/examples/src/main/java/io/snappydata/examples/JavaCreateAndLoadAirlineDataJob.java index e18e68cd6d..9fe78a9deb 100644 --- a/examples/src/main/java/io/snappydata/examples/JavaCreateAndLoadAirlineDataJob.java +++ b/examples/src/main/java/io/snappydata/examples/JavaCreateAndLoadAirlineDataJob.java @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples; import java.io.File; @@ -9,12 +26,12 @@ import com.typesafe.config.Config; import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.JSparkJobInvalid; -import org.apache.spark.sql.JSparkJobValid; -import org.apache.spark.sql.JSparkJobValidation; -import org.apache.spark.sql.JavaSnappySQLJob; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SnappyContext; +import org.apache.spark.sql.SnappyJobInvalid; +import org.apache.spark.sql.SnappyJobValid; +import org.apache.spark.sql.SnappyJobValidation; +import org.apache.spark.sql.SnappySQLJob; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -22,7 +39,7 @@ * Creates and loads Airline data from parquet files in row and column * tables. Also samples the data and stores it in a column table. */ -public class JavaCreateAndLoadAirlineDataJob extends JavaSnappySQLJob { +public class JavaCreateAndLoadAirlineDataJob extends SnappySQLJob { private String airlinefilePath = null; @@ -33,7 +50,7 @@ public class JavaCreateAndLoadAirlineDataJob extends JavaSnappySQLJob { private static final String stagingAirline = "STAGING_AIRLINE"; @Override - public Object runJavaJob(SnappyContext snc, Config jobConfig) { + public Object runSnappyJob(SnappyContext snc, Config jobConfig) { PrintWriter pw = null; String currentDirectory = null; boolean success = false; @@ -108,7 +125,7 @@ public Object runJavaJob(SnappyContext snc, Config jobConfig) { } @Override - public JSparkJobValidation isValidJob(SnappyContext snc, Config config) { + public SnappyJobValidation isValidJob(SnappyContext snc, Config config) { if (config.hasPath("airline_file")) { airlinefilePath = config.getString("airline_file"); @@ -117,7 +134,7 @@ public JSparkJobValidation isValidJob(SnappyContext snc, Config config) { } if (!(new File(airlinefilePath)).exists()) { - return new JSparkJobInvalid("Incorrect airline path. " + + return new SnappyJobInvalid("Incorrect airline path. " + "Specify airline_file property in APP_PROPS"); } @@ -127,11 +144,11 @@ public JSparkJobValidation isValidJob(SnappyContext snc, Config config) { airlinereftablefilePath = "../../quickstart/data/airportcodeParquetData"; } if (!(new File(airlinereftablefilePath)).exists()) { - return new JSparkJobInvalid("Incorrect airline ref path. " + + return new SnappyJobInvalid("Incorrect airline ref path. " + "Specify airlineref_file property in APP_PROPS"); } - return new JSparkJobValid(); + return new SnappyJobValid(); } private static StructType replaceReservedWords(StructType airlineSchema) { diff --git a/examples/src/main/java/io/snappydata/examples/JavaTwitterPopularTagsJob.java b/examples/src/main/java/io/snappydata/examples/JavaTwitterPopularTagsJob.java index ca03fe8c5d..ba3b22ea69 100644 --- a/examples/src/main/java/io/snappydata/examples/JavaTwitterPopularTagsJob.java +++ b/examples/src/main/java/io/snappydata/examples/JavaTwitterPopularTagsJob.java @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples; import java.io.IOException; @@ -8,10 +25,10 @@ import com.typesafe.config.Config; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.JSparkJobValid; -import org.apache.spark.sql.JSparkJobValidation; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SnappyJobValid; +import org.apache.spark.sql.SnappyJobValidation; import org.apache.spark.sql.streaming.SchemaDStream; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -42,7 +59,7 @@ public class JavaTwitterPopularTagsJob extends JavaSnappyStreamingJob { @Override - public Object runJavaJob(JavaSnappyStreamingContext snsc, Config jobConfig) { + public Object runSnappyJob(JavaSnappyStreamingContext snsc, Config jobConfig) { JavaDStream stream = null; PrintWriter pw = null; @@ -202,7 +219,7 @@ private void printResult(Row[] result, PrintWriter pw) { } @Override - public JSparkJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig) { - return new JSparkJobValid(); + public SnappyJobValidation isValidJob(JavaSnappyStreamingContext snc, Config jobConfig) { + return new SnappyJobValid(); } } diff --git a/examples/src/main/scala/io/snappydata/examples/AirlineDataJob.scala b/examples/src/main/scala/io/snappydata/examples/AirlineDataJob.scala index cf232cccfd..691dbd48f3 100644 --- a/examples/src/main/scala/io/snappydata/examples/AirlineDataJob.scala +++ b/examples/src/main/scala/io/snappydata/examples/AirlineDataJob.scala @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples import java.io.PrintWriter @@ -5,10 +22,9 @@ import java.io.PrintWriter import scala.util.{Failure, Success, Try} import com.typesafe.config.Config -import spark.jobserver.{SparkJobValid, SparkJobValidation} import org.apache.spark.sql.snappy._ -import org.apache.spark.sql.{SnappyContext, DataFrame, SnappySQLJob} +import org.apache.spark.sql.{SnappyJobValid, DataFrame, SnappyContext, SnappyJobValidation, SnappySQLJob} /** * Fetches already created tables. Airline table is already persisted in @@ -19,7 +35,7 @@ import org.apache.spark.sql.{SnappyContext, DataFrame, SnappySQLJob} */ object AirlineDataJob extends SnappySQLJob { - override def runJob(snc: SnappyContext, jobConfig: Config): Any = { + override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = { val colTable = "AIRLINE" val parquetTable = "STAGING_AIRLINE" val rowTable = "AIRLINEREF" @@ -112,7 +128,5 @@ object AirlineDataJob extends SnappySQLJob { // scalastyle:on println } - override def validate(sc: C, config: Config): SparkJobValidation = { - SparkJobValid - } + override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = SnappyJobValid() } diff --git a/examples/src/main/scala/io/snappydata/examples/AirlineDataSparkApp.scala b/examples/src/main/scala/io/snappydata/examples/AirlineDataSparkApp.scala index 49ada5641a..7bcefd5e96 100644 --- a/examples/src/main/scala/io/snappydata/examples/AirlineDataSparkApp.scala +++ b/examples/src/main/scala/io/snappydata/examples/AirlineDataSparkApp.scala @@ -1,19 +1,36 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples import org.apache.spark.sql.{Row, SnappyContext, DataFrame} import org.apache.spark.{SparkContext, SparkConf} /** - * This application depicts how a Spark cluster can - * connect to a Snappy cluster to fetch and query the tables - * using Scala APIs in a Spark App. - */ + * This application depicts how a Spark cluster can + * connect to a Snappy cluster to fetch and query the tables + * using Scala APIs in a Spark App. + */ object AirlineDataSparkApp { def main(args: Array[String]) { // scalastyle:off println val conf = new SparkConf(). - setAppName("Airline Data Application") + setAppName("Airline Data Application") val sc = new SparkContext(conf) val snc = SnappyContext(sc) @@ -40,7 +57,7 @@ object AirlineDataSparkApp { // re-brands itself as 'Delta America'.Update the row table. val query: String = " CODE ='DL'" val newColumnValues: Row = Row("Delta America") - snc.update(rowTableName,query,newColumnValues,"DESCRIPTION") + snc.update(rowTableName, query, newColumnValues, "DESCRIPTION") // Data Frame query :Which Airlines Arrive On Schedule? JOIN with reference table val colResultAftUpd = airlineDF.join(airlineCodeDF, airlineDF.col("UniqueCarrier"). diff --git a/examples/src/main/scala/io/snappydata/examples/CreateAndLoadAirlineDataJob.scala b/examples/src/main/scala/io/snappydata/examples/CreateAndLoadAirlineDataJob.scala index a7e77ee9c9..983415da71 100644 --- a/examples/src/main/scala/io/snappydata/examples/CreateAndLoadAirlineDataJob.scala +++ b/examples/src/main/scala/io/snappydata/examples/CreateAndLoadAirlineDataJob.scala @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples import java.io.{File, PrintWriter} @@ -5,14 +22,14 @@ import java.io.{File, PrintWriter} import scala.util.{Failure, Success, Try} import com.typesafe.config.Config -import org.apache.spark.sql.types.{StructType, StructField} -import org.apache.spark.sql.{SnappyContext, SaveMode, SnappySQLJob} -import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation} + +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{SaveMode, SnappyContext, SnappyJobInvalid, SnappyJobValid, SnappyJobValidation, SnappySQLJob} /** - * Creates and loads Airline data from parquet files in row and column - * tables. Also samples the data and stores it in a column table. - */ + * Creates and loads Airline data from parquet files in row and column + * tables. Also samples the data and stores it in a column table. + */ object CreateAndLoadAirlineDataJob extends SnappySQLJob { var airlinefilePath: String = _ @@ -22,7 +39,7 @@ object CreateAndLoadAirlineDataJob extends SnappySQLJob { val sampleTable = "AIRLINE_SAMPLE" val stagingAirline = "STAGING_AIRLINE" - override def runJob(snc: SnappyContext, jobConfig: Config): Any = { + override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = { def getCurrentDirectory = new java.io.File(".").getCanonicalPath val pw = new PrintWriter("CreateAndLoadAirlineDataJob.out") Try { @@ -86,10 +103,10 @@ object CreateAndLoadAirlineDataJob extends SnappySQLJob { } /** - * Validate if the data files are available, else throw SparkJobInvalid - * - */ - override def validate(snc: SnappyContext, config: Config): SparkJobValidation = { + * Validate if the data files are available, else throw SparkJobInvalid + * + */ + override def isValidJob(snc: SnappyContext, config: Config): SnappyJobValidation = { airlinefilePath = if (config.hasPath("airline_file")) { config.getString("airline_file") @@ -98,7 +115,7 @@ object CreateAndLoadAirlineDataJob extends SnappySQLJob { } if (!(new File(airlinefilePath)).exists()) { - return new SparkJobInvalid("Incorrect airline path. " + + return new SnappyJobInvalid("Incorrect airline path. " + "Specify airline_file property in APP_PROPS") } @@ -108,20 +125,20 @@ object CreateAndLoadAirlineDataJob extends SnappySQLJob { "../../quickstart/data/airportcodeParquetData" } if (!(new File(airlinereftablefilePath)).exists()) { - return new SparkJobInvalid("Incorrect airline ref path. " + + return new SnappyJobInvalid("Incorrect airline ref path. " + "Specify airlineref_file property in APP_PROPS") } - SparkJobValid + SnappyJobValid() } /** - * Replace the words that are reserved in Snappy store - * @param airlineSchema schema with reserved words - * @return updated schema - */ - private def replaceReservedWords(airlineSchema : StructType) : StructType = { - new StructType( airlineSchema.map( s => { + * Replace the words that are reserved in Snappy store + * @param airlineSchema schema with reserved words + * @return updated schema + */ + private def replaceReservedWords(airlineSchema: StructType): StructType = { + new StructType(airlineSchema.map(s => { if (s.name.equals("Year")) { new StructField("Year_", s.dataType, s.nullable, s.metadata) } @@ -130,6 +147,7 @@ object CreateAndLoadAirlineDataJob extends SnappySQLJob { } else { s - }}).toArray) + } + }).toArray) } } diff --git a/examples/src/main/scala/io/snappydata/examples/StreamingUtils.scala b/examples/src/main/scala/io/snappydata/examples/StreamingUtils.scala index ea0b8f4328..cc7536c8bc 100644 --- a/examples/src/main/scala/io/snappydata/examples/StreamingUtils.scala +++ b/examples/src/main/scala/io/snappydata/examples/StreamingUtils.scala @@ -1,3 +1,20 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples import com.typesafe.config.Config @@ -13,10 +30,10 @@ import twitter4j.conf.{ConfigurationBuilder} object StreamingUtils { def convertTweetToRow(message: Any, schema: StructType): Seq[Row] = { - var json : JSONObject = null + var json: JSONObject = null var arr: Array[GenericRowWithSchema] = null - if (message.isInstanceOf[String]){ - //for file stream + if (message.isInstanceOf[String]) { + // for file stream json = new JSONObject(message.asInstanceOf[String]) val hashArray = json.get("hashtagEntities").asInstanceOf[JSONArray] arr = new Array[GenericRowWithSchema](hashArray.length()) @@ -26,8 +43,8 @@ object StreamingUtils { arr(i) = new GenericRowWithSchema(Array(UTF8String.fromString(b)), schema) } - }else { - //for twitter stream + } else { + // for twitter stream val status = message.asInstanceOf[Status] val hashArray = status.getHashtagEntities arr = new Array[GenericRowWithSchema](hashArray.length) @@ -42,22 +59,22 @@ object StreamingUtils { } - def convertPopularTweetsToRow(message: Any): Array[TwitterSchema] = { + def convertPopularTweetsToRow(message: Any): Array[TwitterSchema] = { var json: JSONObject = null - var retweetCnt : Int = 0 - var retweetTxt : String = null + var retweetCnt: Int = 0 + var retweetTxt: String = null if (message.isInstanceOf[String]) { - //for file stream + // for file stream json = new JSONObject(message.asInstanceOf[String]) - if(json != null && json.has("retweetedStatus")) { + if (json != null && json.has("retweetedStatus")) { val retweetedSts = json.getJSONObject("retweetedStatus") retweetTxt = retweetedSts.get("text").asInstanceOf[String] retweetCnt = retweetedSts.get("retweetCount").asInstanceOf[Int] } } else { - //for twitter stream + // for twitter stream val status = message.asInstanceOf[Status] - if(status.getRetweetedStatus != null) { + if (status.getRetweetedStatus != null) { retweetTxt = status.getRetweetedStatus.getText retweetCnt = status.getRetweetedStatus.getRetweetCount } @@ -71,14 +88,14 @@ object StreamingUtils { // Generate twitter configuration and authorization new OAuthAuthorization( new ConfigurationBuilder() - .setOAuthConsumerKey(jobConfig.getString("consumerKey")) - .setOAuthConsumerSecret(jobConfig.getString("consumerSecret")) - .setOAuthAccessToken(jobConfig.getString("accessToken")) - .setOAuthAccessTokenSecret(jobConfig.getString("accessTokenSecret")) - .setJSONStoreEnabled(true) - .build()) + .setOAuthConsumerKey(jobConfig.getString("consumerKey")) + .setOAuthConsumerSecret(jobConfig.getString("consumerSecret")) + .setOAuthAccessToken(jobConfig.getString("accessToken")) + .setOAuthAccessTokenSecret(jobConfig.getString("accessTokenSecret")) + .setJSONStoreEnabled(true) + .build()) } } -case class TwitterSchema(retweetCnt : Int, retweetTxt: String) +case class TwitterSchema(retweetCnt: Int, retweetTxt: String) diff --git a/examples/src/main/scala/io/snappydata/examples/TwitterPopularTagsJob.scala b/examples/src/main/scala/io/snappydata/examples/TwitterPopularTagsJob.scala index 9bf09ef8c9..5137b6e5c0 100644 --- a/examples/src/main/scala/io/snappydata/examples/TwitterPopularTagsJob.scala +++ b/examples/src/main/scala/io/snappydata/examples/TwitterPopularTagsJob.scala @@ -1,13 +1,30 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + package io.snappydata.examples import java.io.PrintWriter import com.typesafe.config.Config -import spark.jobserver.{SparkJobValid, SparkJobValidation} -import org.apache.spark.sql.SaveMode import org.apache.spark.sql.streaming.{SchemaDStream, SnappyStreamingJob} import org.apache.spark.sql.types._ +import org.apache.spark.sql.{SaveMode, SnappyJobValid, SnappyJobValidation} +import org.apache.spark.streaming.SnappyStreamingContext import org.apache.spark.streaming.dstream.DStream /** @@ -29,7 +46,7 @@ import org.apache.spark.streaming.dstream.DStream object TwitterPopularTagsJob extends SnappyStreamingJob { - override def runJob(snsc: C, jobConfig: Config): Any = { + override def runSnappyJob(snsc: C, jobConfig: Config): Any = { def getCurrentDirectory = new java.io.File(".").getCanonicalPath @@ -39,58 +56,58 @@ object TwitterPopularTagsJob extends SnappyStreamingJob { val pw = new PrintWriter(outFileName) val schema = StructType(List(StructField("hashtag", StringType))) - + snsc.snappyContext.sql("DROP TABLE IF EXISTS topktable") snsc.snappyContext.sql("DROP TABLE IF EXISTS hashtagtable") snsc.snappyContext.sql("DROP TABLE IF EXISTS retweettable") if (jobConfig.hasPath("consumerKey") && jobConfig.hasPath("consumerKey") - && jobConfig.hasPath("accessToken") && jobConfig.hasPath("accessTokenSecret") ) { + && jobConfig.hasPath("accessToken") && jobConfig.hasPath("accessTokenSecret")) { pw.println("##### Running example with live twitter stream #####") // Create twitter stream table snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING " + - "twitter_stream OPTIONS (" + - s"consumerKey '${jobConfig.getString("consumerKey")}', " + - s"consumerSecret '${jobConfig.getString("consumerSecret")}', " + - s"accessToken '${jobConfig.getString("accessToken")}', " + - s"accessTokenSecret '${jobConfig.getString("accessTokenSecret")}', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow')") + "twitter_stream OPTIONS (" + + s"consumerKey '${jobConfig.getString("consumerKey")}', " + + s"consumerSecret '${jobConfig.getString("consumerSecret")}', " + + s"accessToken '${jobConfig.getString("accessToken")}', " + + s"accessTokenSecret '${jobConfig.getString("accessTokenSecret")}', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow')") snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, " + - "retweetTxt STRING) USING twitter_stream OPTIONS (" + - s"consumerKey '${jobConfig.getString("consumerKey")}', " + - s"consumerSecret '${jobConfig.getString("consumerSecret")}', " + - s"accessToken '${jobConfig.getString("accessToken")}', " + - s"accessTokenSecret '${jobConfig.getString("accessTokenSecret")}', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow')") + "retweetTxt STRING) USING twitter_stream OPTIONS (" + + s"consumerKey '${jobConfig.getString("consumerKey")}', " + + s"consumerSecret '${jobConfig.getString("consumerSecret")}', " + + s"accessToken '${jobConfig.getString("accessToken")}', " + + s"accessTokenSecret '${jobConfig.getString("accessTokenSecret")}', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow')") } else { // Create file stream table pw.println("##### Running example with stored tweet data #####") snsc.sql("CREATE STREAM TABLE hashtagtable (hashtag STRING) USING file_stream " + - "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow'," + - "directory '/tmp/copiedtwitterdata')") + "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToHashtagRow'," + + "directory '/tmp/copiedtwitterdata')") snsc.sql("CREATE STREAM TABLE retweettable (retweetId LONG, retweetCnt INT, " + - "retweetTxt STRING) USING file_stream " + - "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + - "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow'," + - "directory '/tmp/copiedtwitterdata')") + "retweetTxt STRING) USING file_stream " + + "OPTIONS (storagelevel 'MEMORY_AND_DISK_SER_2', " + + "rowConverter 'org.apache.spark.sql.streaming.TweetToRetweetRow'," + + "directory '/tmp/copiedtwitterdata')") } // Register continuous queries on the tables and specify window clauses val retweetStream: SchemaDStream = snsc.registerCQ("SELECT * FROM retweettable " + - "WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)") + "WINDOW (DURATION 2 SECONDS, SLIDE 2 SECONDS)") val topKOption = Map( - "epoch" -> System.currentTimeMillis().toString, - "timeInterval" -> "2000ms", - "size" -> "10" - ) + "epoch" -> System.currentTimeMillis().toString, + "timeInterval" -> "2000ms", + "size" -> "10" + ) // Create TopK table on the base stream table which is hashtagtable // TopK object is automatically populated from the stream table @@ -99,13 +116,13 @@ object TwitterPopularTagsJob extends SnappyStreamingJob { val tableName = "retweetStore" - snsc.snappyContext.dropTable(tableName, true ) + snsc.snappyContext.dropTable(tableName, true) // Create row table to insert retweets based on retweetId as Primary key // When a tweet is retweeted multiple times, the previous entry of the tweet // is over written by the new retweet count. snsc.snappyContext.sql(s"CREATE TABLE $tableName (retweetId BIGINT PRIMARY KEY, " + - s"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()") + s"retweetCnt INT, retweetTxt STRING) USING row OPTIONS ()") // Save data in snappy store retweetStream.foreachDataFrame(df => { @@ -117,11 +134,10 @@ object TwitterPopularTagsJob extends SnappyStreamingJob { // Iterate over the streaming data for twitter data and publish the results to a file. try { - val runTime = if(jobConfig.hasPath("streamRunTime")) - { + val runTime = if (jobConfig.hasPath("streamRunTime")) { jobConfig.getString("streamRunTime").toInt * 1000 } else { - 120*1000 + 120 * 1000 } val end = System.currentTimeMillis + runTime @@ -147,9 +163,9 @@ object TwitterPopularTagsJob extends SnappyStreamingJob { // Query the snappystore Row table to find out the top retweets pw.println("\n####### Top 10 popular tweets - Query Row table #######\n") snsc.snappyContext.sql(s"SELECT retweetId AS RetweetId, " + - s"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ${tableName}" + - s" ORDER BY RetweetsCount DESC LIMIT 10") - .collect.foreach(row => { + s"retweetCnt AS RetweetsCount, retweetTxt AS Text FROM ${tableName}" + + s" ORDER BY RetweetsCount DESC LIMIT 10") + .collect.foreach(row => { pw.println(row.toString()) }) @@ -166,8 +182,8 @@ object TwitterPopularTagsJob extends SnappyStreamingJob { // scalastyle:on println } - override def validate(snsc: C, config: Config): SparkJobValidation = { - SparkJobValid + override def isValidJob(snsc: SnappyStreamingContext, config: Config): SnappyJobValidation = { + SnappyJobValid() }