diff --git a/etl-jobs/src/main/resources/cassandraRedis.conf b/etl-jobs/src/main/resources/cassandraRedis.conf index 3cebe8942..f68f650d6 100644 --- a/etl-jobs/src/main/resources/cassandraRedis.conf +++ b/etl-jobs/src/main/resources/cassandraRedis.conf @@ -7,7 +7,7 @@ redis.connection.idle.max=20 redis.connection.idle.min=10 redis.connection.minEvictableIdleTimeSeconds=120 redis.connection.timeBetweenEvictionRunsSeconds=300 -redis.max.pipeline.size="1000" +redis.max.pipeline.size="100000" #CassandraToRedis Config diff --git a/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/UserCacheIndexer.scala b/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/UserCacheIndexer.scala index fc1b90f62..fbe0e498a 100644 --- a/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/UserCacheIndexer.scala +++ b/etl-jobs/src/main/scala/org/sunbird/analytics/jobs/UserCacheIndexer.scala @@ -1,10 +1,11 @@ package org.sunbird.analytics.jobs -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.{ Config, ConfigFactory } import org.apache.commons.lang.StringUtils -import org.apache.spark.sql.functions.{col, collect_set, concat_ws, explode_outer, first, lit, lower, _} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.functions.{ col, collect_set, concat_ws, explode_outer, first, lit, lower, _ } +import org.apache.spark.sql.{ DataFrame, SaveMode, SparkSession } import org.apache.spark.sql.functions.to_json +import org.apache.spark.storage.StorageLevel object UserCacheIndexer extends Serializable { @@ -26,7 +27,7 @@ object UserCacheIndexer extends Serializable { SparkSession .builder() .appName("AppName") - .config("spark.master", "local") + .config("spark.master", "local[*]") .config("spark.cassandra.connection.host", config.getString("spark.cassandra.connection.host")) .config("spark.redis.host", config.getString("redis.host")) .config("spark.redis.port", config.getString("redis.port")) @@ -48,35 +49,30 @@ object UserCacheIndexer extends Serializable { } } - def denormUserData(): Unit = { + def populateUserData() { val userDF = filterUserData(spark.read.format("org.apache.spark.sql.cassandra").option("table", "user").option("keyspace", sunbirdKeyspace).load() - // Flattening the BGMS - .withColumn("medium", explode_outer(col("framework.medium"))) + .filter(col("userid").isNotNull)) + .withColumn("medium", explode_outer(col("framework.medium"))) // Flattening the BGMS .withColumn("subject", explode_outer(col("framework.subject"))) .withColumn("board", explode_outer(col("framework.board"))) .withColumn("grade", explode_outer(col("framework.gradeLevel"))) .withColumn("framework_id", explode_outer(col("framework.id"))) - .drop("framework")) + .drop("framework") + .withColumnRenamed("framework_id", "framework") + .persist(StorageLevel.MEMORY_ONLY) + + Console.println("User records count:", userDF.count()); + val res1 = time(populateToRedis(userDF)) // Insert all userData Into redis + Console.println("Time taken to insert user records", res1._1) val userOrgDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "user_org").option("keyspace", sunbirdKeyspace).load().filter(lower(col("isdeleted")) === "false") - .select(col("userid"), col("organisationid")).persist() + .select(col("userid"), col("organisationid")) val organisationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "organisation").option("keyspace", sunbirdKeyspace).load() .select(col("id"), col("orgname"), col("channel"), col("orgcode"), - col("locationids"), col("isrootorg")).persist() - - val locationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "location").option("keyspace", sunbirdKeyspace).load() - - val externalIdentityDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "usr_external_identity").option("keyspace", sunbirdKeyspace).load() - .select(col("provider"), col("idtype"), col("externalid"), col("userid")).persist() - // Get CustodianOrgID + col("locationids"), col("isrootorg")) - val custRootOrgId = getCustodianOrgId() - val custodianUserDF = generateCustodianOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF) - val stateUserDF = generateStateOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF, userOrgDF) - - val userLocationResolvedDF = custodianUserDF.unionByName(stateUserDF) // UserLocation /** * Get a union of RootOrg and SubOrg information for a User */ @@ -90,34 +86,74 @@ object UserCacheIndexer extends Serializable { val rootOnlyOrgDF = userRootOrgDF.join(userSubOrgDF, Seq("userid"), "leftanti").select(userRootOrgDF.col("*")) val userOrgDenormDF = rootOnlyOrgDF.union(userSubOrgDF) - + /** * Resolve organization name for a RootOrg */ val resolvedOrgNameDF = userOrgDenormDF .join(organisationDF, organisationDF.col("id") === userOrgDenormDF.col("rootorgid"), "left_outer") .groupBy("userid") - .agg(concat_ws(",", collect_set("orgname")).as("orgname")) + .agg(concat_ws(",", collect_set("orgname")).as("orgname_resolved")) + val filteredOrgDf = resolvedOrgNameDF.select(col("userid"), col("orgname_resolved").as("orgname")) + + val res2 = time(populateToRedis(filteredOrgDf)) + userDF.unpersist(); + Console.println("Time taken to insert user org records", res2._1) + } - val userDataDF = userLocationResolvedDF - .join(resolvedOrgNameDF, Seq("userid"), "left") - .withColumn("framework", col("framework_id")) - .drop("framework_id") + def denormUserData(): Unit = { - populateToRedis(userDataDF) + val userDF = filterUserData(spark.read.format("org.apache.spark.sql.cassandra").option("table", "user").option("keyspace", sunbirdKeyspace).load() + .filter(col("userid").isNotNull)) + .select("userid", "locationids", "rootorgid","channel").persist(StorageLevel.MEMORY_ONLY) - userOrgDF.unpersist() - organisationDF.unpersist() - locationDF.unpersist() - externalIdentityDF.unpersist() + val userOrgDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "user_org").option("keyspace", sunbirdKeyspace).load().filter(lower(col("isdeleted")) === "false") + .select(col("userid"), col("organisationid")).persist(StorageLevel.MEMORY_ONLY) + + val organisationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "organisation").option("keyspace", sunbirdKeyspace).load() + .select(col("id"), col("orgname"), col("channel"), col("orgcode"), col("locationids"), col("isrootorg")).persist(StorageLevel.MEMORY_ONLY) + + val locationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "location").option("keyspace", sunbirdKeyspace).load().persist(StorageLevel.MEMORY_ONLY) + + val externalIdentityDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "usr_external_identity").option("keyspace", sunbirdKeyspace).load() + .select(col("provider"), col("idtype"), col("externalid"), col("userid")) + + // Get CustodianOrgID + val custRootOrgId = getCustodianOrgId() + Console.println("#### custRootOrgId ####", custRootOrgId) + + val custodianUserDF = generateCustodianOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF) + + val filteredCustoDian = custodianUserDF.select( + col("declared-school-name").as("schoolname"), + col("declared-ext-id").as("externalid"), + col("state_name").as("state"), + col("district"), col("block"), + col("declared-school-udise-code").as("schooludisecode"), + col("user_channel").as("userchannel"), col("userid"), col("usersignintype")) + + val res1 = time(populateToRedis(filteredCustoDian.distinct())) + Console.println("Time taken to insert custodian details", res1._1) + + val stateUserDF = generateStateOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF, userOrgDF) + + val filteredStateDF = stateUserDF.select( + col("declared-school-name").as("schoolname"), + col("declared-ext-id").as("externalid"), + col("state_name").as("state"), + col("district"), col("block"), + col("declared-school-udise-code").as("schooludisecode"), + col("user_channel").as("userchannel"), col("userid"), col("usersignintype")) + + val res2 = time(populateToRedis(filteredStateDF.distinct())) + Console.println("Time taken to insert state user details", res2._1) userDF.unpersist() - } + } def getCustodianOrgId(): String = { val systemSettingDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "system_settings").option("keyspace", sunbirdKeyspace).load() - val df = systemSettingDF.where(col("id") === "custodianOrgId" && col("field") === "custodianOrgId") - .select(col("value")).persist() + val df = systemSettingDF.where(col("id") === "custodianOrgId" && col("field") === "custodianOrgId").select(col("value")) df.select("value").first().getString(0) } @@ -132,7 +168,6 @@ object UserCacheIndexer extends Serializable { .withColumn("exploded_location", explode_outer(col("locationids"))) .select(col("userid"), col("exploded_location"), col("locationids")) - val userStateDF = userExplodedLocationDF .join(locationDF, col("exploded_location") === locationDF.col("id") && locationDF.col("type") === "state") .select(userExplodedLocationDF.col("userid"), col("name").as("state_name")) @@ -153,11 +188,12 @@ object UserCacheIndexer extends Serializable { .join(userStateDF, Seq("userid"), "inner") .join(userDistrictDF, Seq("userid"), "left") .join(userBlockDF, Seq("userid"), "left") - .select(userDF.col("*"), - col("state_name").as("state"), + .select( + userDF.col("*"), + col("state_name"), col("district"), col("block")) - .withColumn("usersignintype", lit("Self-Signed-In")) + .withColumn("usersignintype", lit("Self-Signed-In")) // .drop(col("locationids")) val custodianUserPivotDF = custodianOrguserLocationDF @@ -167,20 +203,16 @@ object UserCacheIndexer extends Serializable { .groupBy(custodianOrguserLocationDF.col("userid"), organisationDF.col("id")) .pivot("idtype", Seq("declared-ext-id", "declared-school-name", "declared-school-udise-code")) .agg(first(col("externalid"))) - .select(custodianOrguserLocationDF.col("userid"), + .select( + custodianOrguserLocationDF.col("userid"), col("declared-ext-id"), col("declared-school-name"), col("declared-school-udise-code"), - organisationDF.col("id").as("userchannel")) + organisationDF.col("id").as("user_channel")) val custodianUserDF = custodianOrguserLocationDF.as("userLocDF") .join(custodianUserPivotDF, Seq("userid"), "left") - .select(col("userLocDF.*"), - col("declared-ext-id"), - col("declared-school-name").as("schoolname"), - col("declared-school-udise-code").as("schooludisecode"), - col("userchannel")) - + .select("userLocDF.*", "declared-ext-id", "declared-school-name", "declared-school-udise-code", "user_channel") custodianUserDF } @@ -218,10 +250,11 @@ object UserCacheIndexer extends Serializable { val stateUserLocationResolvedDF = userDF.filter(col("rootorgid") =!= lit(custRootOrgId)) .join(subOrgDF, Seq("userid"), "left") - .select(userDF.col("*"), - subOrgDF.col("orgname").as("schoolname"), - subOrgDF.col("orgcode").as("schooludisecode"), - subOrgDF.col("state_name").as("state"), + .select( + userDF.col("*"), + subOrgDF.col("orgname").as("declared-school-name"), + subOrgDF.col("orgcode").as("declared-school-udise-code"), + subOrgDF.col("state_name"), subOrgDF.col("district"), subOrgDF.col("block")) .withColumn("usersignintype", lit("Validated")) @@ -231,12 +264,7 @@ object UserCacheIndexer extends Serializable { .join(externalIdentityDF, externalIdentityDF.col("idtype") === col("state_user.channel") && externalIdentityDF.col("provider") === col("state_user.channel") && externalIdentityDF.col("userid") === col("state_user.userid"), "left") - .select( - col("state_user.*"), - externalIdentityDF.col("externalid").as("declared-ext-id"), - col("rootorgid").as("userchannel") - ) - + .select(col("state_user.*"), externalIdentityDF.col("externalid").as("declared-ext-id"), col("rootorgid").as("user_channel")) stateUserDF } @@ -246,8 +274,7 @@ object UserCacheIndexer extends Serializable { val complexFields = schema.fields.filter(field => complexFieldTypes.contains(field.dataType.typeName)) val resultDF = complexFields.foldLeft(filteredDF)((df, field) => - df.withColumn(field.name, to_json(col(field.name))) - ) + df.withColumn(field.name, to_json(col(field.name)))) resultDF.write .format("org.apache.spark.sql.redis") @@ -257,8 +284,18 @@ object UserCacheIndexer extends Serializable { .save() } - denormUserData() + val res1 = time(populateUserData()) + val res2 = time(denormUserData()) + val totalTimeTaken = (res1._1 + res2._1).toDouble/1000 + Console.println("Time taken for individual steps:", "stage1", res1._1, "stage2", res2._1) + Console.println("Time taken for complete script:", totalTimeTaken); } + def time[R](block: => R): (Long, R) = { + val t0 = System.currentTimeMillis() + val result = block // call-by-name + val t1 = System.currentTimeMillis() + ((t1 - t0), result) + } }