From 03715739a1a6d69cb6890f3b71a660e8f6d21954 Mon Sep 17 00:00:00 2001 From: Isha Wakankar Date: Fri, 3 Jul 2020 16:49:50 +0530 Subject: [PATCH 1/4] Added redis connection configuration --- .../analytics/framework/util/CommonUtil.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 4d7b0976..587ca892 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -54,7 +54,7 @@ object CommonUtil { fc; } - def getSparkContext(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None, sparkElasticsearchConnectionHost: Option[AnyRef] = None): SparkContext = { + def getSparkContext(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None, sparkElasticsearchConnectionHost: Option[AnyRef] = None, sparkRedisConnectionHost: Option[AnyRef] = None, sparkRedisDB: Option[AnyRef] = None): SparkContext = { JobLogger.log("Initializing Spark Context") val conf = new SparkConf().setAppName(appName).set("spark.default.parallelism", parallelization.toString) .set("spark.driver.memory", AppConf.getConfig("spark.driver_memory")) @@ -83,6 +83,12 @@ object CommonUtil { conf.set("es.write.rest.error.handler.log.logger.level", "INFO") } + if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { + conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + conf.set("spark.redis.port", "6379") + conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + } + val sc = new SparkContext(conf) setS3Conf(sc) setAzureConf(sc) @@ -91,7 +97,8 @@ object CommonUtil { } def getSparkSession(parallelization: Int, appName: String, sparkCassandraConnectionHost: Option[AnyRef] = None, - sparkElasticsearchConnectionHost: Option[AnyRef] = None, readConsistencyLevel: Option[String] = None): SparkSession = { + sparkElasticsearchConnectionHost: Option[AnyRef] = None, readConsistencyLevel: Option[String] = None, + sparkRedisConnectionHost: Option[AnyRef] = None, sparkRedisDB: Option[AnyRef] = None): SparkSession = { JobLogger.log("Initializing SparkSession") val conf = new SparkConf().setAppName(appName).set("spark.default.parallelism", parallelization.toString) .set("spark.driver.memory", AppConf.getConfig("spark.driver_memory")) @@ -125,6 +132,12 @@ object CommonUtil { } + if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { + conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + conf.set("spark.redis.port", "6379") + conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + } + val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() setS3Conf(sparkSession.sparkContext) setAzureConf(sparkSession.sparkContext) From 117d58cf90758d6e5d7dc06e356dadb778f87fa1 Mon Sep 17 00:00:00 2001 From: Isha Wakankar Date: Sun, 5 Jul 2020 23:29:15 +0530 Subject: [PATCH 2/4] Test case coverage improvement --- .../analytics/framework/util/TestCommonUtil.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala index e800021f..a4a51025 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestCommonUtil.scala @@ -328,11 +328,21 @@ class TestCommonUtil extends BaseSpec { sc.stop(); } + noException should be thrownBy { + val sc = CommonUtil.getSparkContext(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("10.0.0.0"), Option("2")); + sc.stop(); + } + noException should be thrownBy { val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("Quorum")) sc.stop(); } + noException should be thrownBy { + val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), Option("Quorum"), Option("10.0.0.0"), Option("2")) + sc.stop(); + } + noException should be thrownBy { val sc = CommonUtil.getSparkSession(10, "Test", Option("10.0.0.0"), Option("10.0.0.0"), None) sc.stop(); From 8d3f988462eb6dc82d9ffe8ad981f111c75e21e8 Mon Sep 17 00:00:00 2001 From: Isha Wakankar Date: Tue, 7 Jul 2020 14:20:30 +0530 Subject: [PATCH 3/4] Removing duplicate code --- .../analytics/framework/util/CommonUtil.scala | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index 587ca892..bbd964a5 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -68,25 +68,25 @@ object CommonUtil { } if (!conf.contains("spark.cassandra.connection.host")) - conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) + setSparkConf(conf, "spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) // $COVERAGE-ON$ if (sparkCassandraConnectionHost.nonEmpty) { - conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) println("setting spark.cassandra.connection.host to lp-cassandra", conf.get("spark.cassandra.connection.host")) } if (sparkElasticsearchConnectionHost.nonEmpty) { - conf.set("es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) - conf.set("es.port", "9200") - conf.set("es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") - conf.set("es.write.rest.error.handler.log.logger.level", "INFO") + setSparkConf(conf, "es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "es.port", "9200") + setSparkConf(conf, "es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") + setSparkConf(conf, "es.write.rest.error.handler.log.logger.level", "INFO") } if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { - conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) - conf.set("spark.redis.port", "6379") - conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + setSparkConf(conf, "spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "spark.redis.port", "6379") + setSparkConf(conf, "spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) } val sc = new SparkContext(conf) @@ -112,30 +112,29 @@ object CommonUtil { } if (!conf.contains("spark.cassandra.connection.host")) - conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) + setSparkConf(conf, "spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) // $COVERAGE-ON$ if (sparkCassandraConnectionHost.nonEmpty) { - conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) if (readConsistencyLevel.nonEmpty) { - conf.set("spark.cassandra.input.consistency.level", readConsistencyLevel.get); + setSparkConf(conf, "spark.cassandra.input.consistency.level", readConsistencyLevel.get) } println("setting spark.cassandra.connection.host to lp-cassandra", conf.get("spark.cassandra.connection.host")) } if (sparkElasticsearchConnectionHost.nonEmpty) { - conf.set("es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) - conf.set("es.port", "9200") - conf.set("es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") - conf.set("es.write.rest.error.handler.log.logger.level", "INFO") - conf.set("es.write.operation", "upsert") - + setSparkConf(conf, "es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "es.port", "9200") + setSparkConf(conf, "es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") + setSparkConf(conf, "es.write.rest.error.handler.log.logger.level", "INFO") + setSparkConf(conf, "es.write.operation", "upsert") } if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { - conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) - conf.set("spark.redis.port", "6379") - conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + setSparkConf(conf, "spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + setSparkConf(conf, "spark.redis.port", "6379") + setSparkConf(conf, "spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) } val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() @@ -145,6 +144,10 @@ object CommonUtil { sparkSession } + def setSparkConf(conf: SparkConf, key: String, value: String) { + conf.set(key, value) + } + def setS3Conf(sc: SparkContext) = { JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey()); From 8b8ce0684eae57f6aa9176f1d683d6168d08dc55 Mon Sep 17 00:00:00 2001 From: Isha Wakankar Date: Tue, 7 Jul 2020 16:38:41 +0530 Subject: [PATCH 4/4] . --- .../analytics/framework/util/CommonUtil.scala | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala index bbd964a5..c7678060 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/CommonUtil.scala @@ -68,25 +68,25 @@ object CommonUtil { } if (!conf.contains("spark.cassandra.connection.host")) - setSparkConf(conf, "spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) + conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) // $COVERAGE-ON$ if (sparkCassandraConnectionHost.nonEmpty) { - setSparkConf(conf, "spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) + conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) println("setting spark.cassandra.connection.host to lp-cassandra", conf.get("spark.cassandra.connection.host")) } if (sparkElasticsearchConnectionHost.nonEmpty) { - setSparkConf(conf, "es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) - setSparkConf(conf, "es.port", "9200") - setSparkConf(conf, "es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") - setSparkConf(conf, "es.write.rest.error.handler.log.logger.level", "INFO") + conf.set("es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) + conf.set("es.port", "9200") + conf.set("es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") + conf.set("es.write.rest.error.handler.log.logger.level", "INFO") } if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { - setSparkConf(conf, "spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) - setSparkConf(conf, "spark.redis.port", "6379") - setSparkConf(conf, "spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + conf.set("spark.redis.port", "6379") + conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) } val sc = new SparkContext(conf) @@ -112,29 +112,29 @@ object CommonUtil { } if (!conf.contains("spark.cassandra.connection.host")) - setSparkConf(conf, "spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) + conf.set("spark.cassandra.connection.host", AppConf.getConfig("spark.cassandra.connection.host")) // $COVERAGE-ON$ if (sparkCassandraConnectionHost.nonEmpty) { - setSparkConf(conf, "spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) + conf.set("spark.cassandra.connection.host", sparkCassandraConnectionHost.get.asInstanceOf[String]) if (readConsistencyLevel.nonEmpty) { - setSparkConf(conf, "spark.cassandra.input.consistency.level", readConsistencyLevel.get) + conf.set("spark.cassandra.input.consistency.level", readConsistencyLevel.get) } println("setting spark.cassandra.connection.host to lp-cassandra", conf.get("spark.cassandra.connection.host")) } if (sparkElasticsearchConnectionHost.nonEmpty) { - setSparkConf(conf, "es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) - setSparkConf(conf, "es.port", "9200") - setSparkConf(conf, "es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") - setSparkConf(conf, "es.write.rest.error.handler.log.logger.level", "INFO") - setSparkConf(conf, "es.write.operation", "upsert") + conf.set("es.nodes", sparkElasticsearchConnectionHost.get.asInstanceOf[String]) + conf.set("es.port", "9200") + conf.set("es.write.rest.error.handler.log.logger.name", "org.ekstep.es.dispatcher") + conf.set("es.write.rest.error.handler.log.logger.level", "INFO") + conf.set("es.write.operation", "upsert") } if(sparkRedisConnectionHost.nonEmpty && sparkRedisDB.nonEmpty) { - setSparkConf(conf, "spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) - setSparkConf(conf, "spark.redis.port", "6379") - setSparkConf(conf, "spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) + conf.set("spark.redis.host", sparkRedisConnectionHost.get.asInstanceOf[String]) + conf.set("spark.redis.port", "6379") + conf.set("spark.redis.db", sparkRedisDB.get.asInstanceOf[String]) } val sparkSession = SparkSession.builder().appName("sunbird-analytics").config(conf).getOrCreate() @@ -144,10 +144,6 @@ object CommonUtil { sparkSession } - def setSparkConf(conf: SparkConf, key: String, value: String) { - conf.set(key, value) - } - def setS3Conf(sc: SparkContext) = { JobLogger.log("Configuring S3 AccessKey& SecrateKey to SparkContext") sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", AppConf.getAwsKey());