Skip to content

Commit

Permalink
Merge pull request #42 from ishawakankar/3.1
Browse files Browse the repository at this point in the history
Added redis connection configuration in CommonUtil
  • Loading branch information
SanthoshVasabhaktula authored Jul 8, 2020
2 parents fcd4dc8 + 8b8ce06 commit 8d2c90e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object CommonUtil {
fc;
}

def getSparkContext(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None, sparkElasticsearchConnectionHost: Option[AnyRef] = None): SparkContext = {
def getSparkContext(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None, sparkElasticsearchConnectionHost: Option[AnyRef] = None, sparkRedisConnectionHost: Option[AnyRef] = None, sparkRedisDB: Option[AnyRef] = None): SparkContext = {
JobLogger.log("Initializing Spark Context")
val conf = new SparkConf().setAppName(appName).set("spark.default.parallelism", parallelization.toString)
.set("spark.driver.memory", AppConf.getConfig("spark.driver_memory"))
Expand Down Expand Up @@ -83,6 +83,12 @@ object CommonUtil {
conf.set("es.write.rest.error.handler.log.logger.level", "INFO")
}

if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) {
conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String])
conf.set("spark.redis.port", "6379")
conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String])
}

val sc = new SparkContext(conf)
setS3Conf(sc)
setAzureConf(sc)
Expand All @@ -91,7 +97,8 @@ object CommonUtil {
}

def getSparkSession(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None,
sparkElasticsearchConnectionHost: Option[AnyRef] = None, readConsistencyLevel: Option[String] = None): SparkSession = {
sparkElasticsearchConnectionHost: Option[AnyRef] = None, readConsistencyLevel: Option[String] = None,
sparkRedisConnectionHost: Option[AnyRef] = None, sparkRedisDB: Option[AnyRef] = None): SparkSession = {
JobLogger.log("Initializing SparkSession")
val conf = new SparkConf().setAppName(appName).set("spark.default.parallelism", parallelization.toString)
.set("spark.driver.memory", AppConf.getConfig("spark.driver_memory"))
Expand All @@ -105,13 +112,13 @@ object CommonUtil {
}

if (!conf.contains("spark.cassandra.connection.host"))
conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host"))
conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host"))
// $COVERAGE-ON$

if (sparkCassandraConnectionHost.nonEmpty) {
conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String])
if (readConsistencyLevel.nonEmpty) {
conf.set("spark.cassandra.input.consistency.level", readConsistencyLevel.get);
conf.set("spark.cassandra.input.consistency.level", readConsistencyLevel.get)
}
println("setting spark.cassandra.connection.host to lp-cassandra", conf.get("spark.cassandra.connection.host"))
}
Expand All @@ -122,7 +129,12 @@ object CommonUtil {
conf.set("es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher")
conf.set("es.write.rest.error.handler.log.logger.level", "INFO")
conf.set("es.write.operation", "upsert")
}

if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) {
conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String])
conf.set("spark.redis.port", "6379")
conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String])
}

val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,21 @@ class TestCommonUtil extends BaseSpec {
sc.stop();
}

noException should be thrownBy {
val sc = CommonUtil.getSparkContext(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("10.0.0.0"), Option("2"));
sc.stop();
}

noException should be thrownBy {
val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("Quorum"))
sc.stop();
}

noException should be thrownBy {
val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("Quorum"), Option("10.0.0.0"), Option("2"))
sc.stop();
}

noException should be thrownBy {
val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), None)
sc.stop();
Expand Down

0 comments on commit 8d2c90e

Please sign in to comment.