Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions cluster/src/main/java/org/apache/spark/sql/JavaSnappySQLJob.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@


import com.typesafe.config.Config;
import org.apache.spark.sql.JSparkJobValidation;
import org.apache.spark.sql.JavaJobValidate;
import org.apache.spark.sql.streaming.SnappyStreamingJob;
import org.apache.spark.sql.SnappyJobValidate;
import org.apache.spark.sql.SnappyJobValidation;
import org.apache.spark.streaming.api.java.JavaSnappyStreamingContext;

import spark.jobserver.SparkJobBase;
import spark.jobserver.SparkJobValidation;
import org.apache.spark.util.Utils;

public abstract class JavaSnappyStreamingJob implements SnappyStreamingJob {
public abstract class JavaSnappyStreamingJob implements SparkJobBase {

abstract public Object runJavaJob(JavaSnappyStreamingContext snc, Config jobConfig);
abstract public Object runSnappyJob(JavaSnappyStreamingContext snc, Config jobConfig);

abstract public JSparkJobValidation isValidJob(JavaSnappyStreamingContext snc,
abstract public SnappyJobValidation isValidJob(JavaSnappyStreamingContext snc,
Config jobConfig);

@Override
public Object runJob(Object sc, Config jobConfig) {
return runJavaJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), jobConfig);
final public SparkJobValidation validate(Object sc, Config config) {
return SnappyJobValidate.validate(isValidJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), config));
}

@Override
public SparkJobValidation validate(Object sc, Config config) {
JSparkJobValidation status =
isValidJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), config);
return JavaJobValidate.validate(status);
final public Object runJob(Object sc, Config jobConfig) {
return runSnappyJob(new JavaSnappyStreamingContext((SnappyStreamingContext)sc), jobConfig);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,10 @@ package org.apache.spark.sql
import com.typesafe.config.Config
import io.snappydata.impl.LeadImpl
import spark.jobserver.context.SparkContextFactory
import spark.jobserver.{SparkJobValid, SparkJobInvalid, SparkJobValidation, SparkJob, ContextLike, SparkJobBase}
import spark.jobserver.{ContextLike, SparkJobBase, SparkJobInvalid, SparkJobValid, SparkJobValidation}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkConf

trait SnappySQLJob extends SparkJobBase {
type C = SnappyContext
}

object JavaJobValidate{
def validate(status :JSparkJobValidation) : SparkJobValidation ={
status match {
case j : JSparkJobValid => SparkJobValid
case j : JSparkJobInvalid => SparkJobInvalid(j.reason)
case _ => SparkJobInvalid("isValid method is not correct")
}
}
}
trait JSparkJobValidation
case class JSparkJobValid extends JSparkJobValidation
case class JSparkJobInvalid(reason : String) extends JSparkJobValidation


class SnappyContextFactory extends SparkContextFactory {
Expand All @@ -61,9 +43,42 @@ object SnappyContextFactory {
new SnappyContext(snappyContextLike.sparkContext,
snappyContextLike.listener,
false) with ContextLike {
override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappySQLJob]
override def stop(): Unit = {
// not stopping anything here because SQLContext doesn't have one.
override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappySQLJob]
override def stop(): Unit = {
// not stopping anything here because SQLContext doesn't have one.
}
}
}



abstract class SnappySQLJob extends SparkJobBase {
type C = Any

final override def validate(sc: C, config: Config): SparkJobValidation = {
SnappyJobValidate.validate(isValidJob(sc.asInstanceOf[SnappyContext], config))
}

final override def runJob(sc: C, jobConfig: Config): Any = {
runSnappyJob(sc.asInstanceOf[SnappyContext], jobConfig)
}

def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation

def runSnappyJob(sc: SnappyContext, jobConfig: Config): Any;

}

object SnappyJobValidate {
def validate(status: SnappyJobValidation): SparkJobValidation = {
status match {
case j: SnappyJobValid => SparkJobValid
case j: SnappyJobInvalid => SparkJobInvalid(j.reason)
case _ => SparkJobInvalid("isValid method is not correct")
}
}
}

trait SnappyJobValidation
case class SnappyJobValid() extends SnappyJobValidation
case class SnappyJobInvalid(reason : String) extends SnappyJobValidation
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,26 @@ package org.apache.spark.sql.streaming
import com.typesafe.config.{ConfigException, Config}
import io.snappydata.impl.LeadImpl
import spark.jobserver.context.SparkContextFactory
import spark.jobserver.{ContextLike, SparkJobBase}
import spark.jobserver.{SparkJobValidation, ContextLike, SparkJobBase}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, SnappyStreamingContext}
import org.apache.spark.sql.{SnappyJobValidation, SnappyJobValidate}
import org.apache.spark.streaming.{JavaSnappyStreamingJob, Milliseconds, SnappyStreamingContext}

abstract class SnappyStreamingJob extends SparkJobBase {
override type C = SnappyStreamingContext
final override def validate(sc: C, config: Config): SparkJobValidation = {
SnappyJobValidate.validate(isValidJob(sc.asInstanceOf[SnappyStreamingContext], config))
}

final override def runJob(sc: C, jobConfig: Config): Any = {
runSnappyJob(sc.asInstanceOf[SnappyStreamingContext], jobConfig)
}

def isValidJob(sc: SnappyStreamingContext, config: Config): SnappyJobValidation

def runSnappyJob(sc: SnappyStreamingContext, jobConfig: Config): Any;

trait SnappyStreamingJob extends SparkJobBase {
type C = SnappyStreamingContext
}

class SnappyStreamingContextFactory extends SparkContextFactory {
Expand All @@ -38,7 +51,8 @@ class SnappyStreamingContextFactory extends SparkContextFactory {
new SnappyStreamingContext(LeadImpl.getInitializingSparkContext,
Milliseconds(interval)) with ContextLike {

override def isValidJob(job: SparkJobBase): Boolean = job.isInstanceOf[SnappyStreamingJob]
override def isValidJob(job: SparkJobBase): Boolean =
job.isInstanceOf[SnappyStreamingJob] || job.isInstanceOf[JavaSnappyStreamingJob]

override def stop(): Unit = {
try {
Expand All @@ -50,4 +64,4 @@ class SnappyStreamingContextFactory extends SparkContextFactory {
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Copyright (c) 2016 SnappyData, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

package io.snappydata.benchmark.snappy

import scala.collection.JavaConverters._
Expand All @@ -6,12 +23,9 @@ import scala.language.implicitConversions
import com.typesafe.config.Config
import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation}

import org.apache.spark.sql.{SnappyContext, SnappySQLJob}
import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappyContext, SnappySQLJob}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by kishor on 28/1/16.
*/
object TPCH_Snappy_Query extends SnappySQLJob{

var sqlSparkProperties: Array[String] = _
Expand All @@ -23,7 +37,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{
var runsForAverage: Integer = _


override def runJob(snc: C, jobConfig: Config): Any = {
override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = {

jobConfig.entrySet().asScala.foreach(entry => if (entry.getKey.startsWith("spark.sql.")) {
val entryString = entry.getKey + "=" + jobConfig.getString(entry.getKey)
Expand Down Expand Up @@ -89,7 +103,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{
runJob(snc, null)
}

override def validate(sc: C, config: Config): SparkJobValidation = {
override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = {

var sqlSparkProps = if (config.hasPath("sparkSqlProps")) {
config.getString("sparkSqlProps")
Expand All @@ -101,7 +115,7 @@ object TPCH_Snappy_Query extends SnappySQLJob{
var tempqueries = if (config.hasPath("queries")) {
config.getString("queries")
} else {
return new SparkJobInvalid("Specify Query number to be executed")
return new SnappyJobInvalid("Specify Query number to be executed")
}

println(s"tempqueries : $tempqueries")
Expand All @@ -110,25 +124,25 @@ object TPCH_Snappy_Query extends SnappySQLJob{
useIndex = if (config.hasPath("useIndex")) {
config.getBoolean("useIndex")
} else {
return new SparkJobInvalid("Specify whether to use Index")
return new SnappyJobInvalid("Specify whether to use Index")
}

isResultCollection = if (config.hasPath("resultCollection")) {
config.getBoolean("resultCollection")
} else {
return new SparkJobInvalid("Specify whether to to collect results")
return new SnappyJobInvalid("Specify whether to to collect results")
}

warmUp = if (config.hasPath("warmUpIterations")) {
config.getInt("warmUpIterations")
} else {
return new SparkJobInvalid("Specify number of warmup iterations ")
return new SnappyJobInvalid("Specify number of warmup iterations ")
}
runsForAverage = if (config.hasPath("actualRuns")) {
config.getInt("actualRuns")
} else {
return new SparkJobInvalid("Specify number of iterations of which average result is calculated")
return new SnappyJobInvalid("Specify number of iterations of which average result is calculated")
}
SparkJobValid
new SnappyJobValid()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import com.typesafe.config.Config
import io.snappydata.benchmark.{TPCHColumnPartitionedTable, TPCHReplicatedTable}
import spark.jobserver.{SparkJobInvalid, SparkJobValid, SparkJobValidation}

import org.apache.spark.sql.SnappySQLJob
import org.apache.spark.sql.{SnappyJobValid, SnappyJobInvalid, SnappyJobValidation, SnappyContext, SnappySQLJob}

object TPCH_Snappy_Tables extends SnappySQLJob{

Expand All @@ -33,7 +33,7 @@ object TPCH_Snappy_Tables extends SnappySQLJob{
var useIndex: Boolean = _
var nation_Region_Supp_col = false

override def runJob(snc: C, jobConfig: Config): Any = {
override def runSnappyJob(snc: SnappyContext, jobConfig: Config): Any = {
val props : Map[String, String] = null
val isSnappy = true

Expand Down Expand Up @@ -63,7 +63,7 @@ object TPCH_Snappy_Tables extends SnappySQLJob{
}
}

override def validate(sc: C, config: Config): SparkJobValidation = {
override def isValidJob(sc: SnappyContext, config: Config): SnappyJobValidation = {

tpchDataPath = if (config.hasPath("dataLocation")) {
config.getString("dataLocation")
Expand Down Expand Up @@ -97,15 +97,15 @@ object TPCH_Snappy_Tables extends SnappySQLJob{
}

if (!(new File(tpchDataPath)).exists()) {
return new SparkJobInvalid("Incorrect tpch data path. " +
return new SnappyJobInvalid("Incorrect tpch data path. " +
"Specify correct location")
}

useIndex = if (config.hasPath("useIndex")) {
config.getBoolean("useIndex")
} else {
return new SparkJobInvalid("Specify whether to use Index")
return new SnappyJobInvalid("Specify whether to use Index")
}
SparkJobValid
SnappyJobValid()
}
}
Loading