Skip to content

Commit

Permalink
Merge pull request #181 from SanthoshVasabhaktula/release-3.1.0
Browse files Browse the repository at this point in the history
#000 - Fix the user cache indexer script
  • Loading branch information
SanthoshVasabhaktula authored Aug 10, 2020
2 parents aba433e + 1c125f6 commit 45d4855
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 61 deletions.
2 changes: 1 addition & 1 deletion etl-jobs/src/main/resources/cassandraRedis.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ redis.connection.idle.max=20
redis.connection.idle.min=10
redis.connection.minEvictableIdleTimeSeconds=120
redis.connection.timeBetweenEvictionRunsSeconds=300
redis.max.pipeline.size="1000"
redis.max.pipeline.size="100000"


#CassandraToRedis Config
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package org.sunbird.analytics.jobs

import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.functions.{col, collect_set, concat_ws, explode_outer, first, lit, lower, _}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{ col, collect_set, concat_ws, explode_outer, first, lit, lower, _ }
import org.apache.spark.sql.{ DataFrame, SaveMode, SparkSession }
import org.apache.spark.sql.functions.to_json
import org.apache.spark.storage.StorageLevel

object UserCacheIndexer extends Serializable {

Expand All @@ -26,7 +27,7 @@ object UserCacheIndexer extends Serializable {
SparkSession
.builder()
.appName("AppName")
.config("spark.master", "local")
.config("spark.master", "local[*]")
.config("spark.cassandra.connection.host", config.getString("spark.cassandra.connection.host"))
.config("spark.redis.host", config.getString("redis.host"))
.config("spark.redis.port", config.getString("redis.port"))
Expand All @@ -48,35 +49,30 @@ object UserCacheIndexer extends Serializable {
}
}

def denormUserData(): Unit = {
def populateUserData() {

val userDF = filterUserData(spark.read.format("org.apache.spark.sql.cassandra").option("table", "user").option("keyspace", sunbirdKeyspace).load()
// Flattening the BGMS
.withColumn("medium", explode_outer(col("framework.medium")))
.filter(col("userid").isNotNull))
.withColumn("medium", explode_outer(col("framework.medium"))) // Flattening the BGMS
.withColumn("subject", explode_outer(col("framework.subject")))
.withColumn("board", explode_outer(col("framework.board")))
.withColumn("grade", explode_outer(col("framework.gradeLevel")))
.withColumn("framework_id", explode_outer(col("framework.id")))
.drop("framework"))
.drop("framework")
.withColumnRenamed("framework_id", "framework")
.persist(StorageLevel.MEMORY_ONLY)

Console.println("User records count:", userDF.count());
val res1 = time(populateToRedis(userDF)) // Insert all userData Into redis
Console.println("Time taken to insert user records", res1._1)

val userOrgDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "user_org").option("keyspace", sunbirdKeyspace).load().filter(lower(col("isdeleted")) === "false")
.select(col("userid"), col("organisationid")).persist()
.select(col("userid"), col("organisationid"))

val organisationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "organisation").option("keyspace", sunbirdKeyspace).load()
.select(col("id"), col("orgname"), col("channel"), col("orgcode"),
col("locationids"), col("isrootorg")).persist()

val locationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "location").option("keyspace", sunbirdKeyspace).load()

val externalIdentityDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "usr_external_identity").option("keyspace", sunbirdKeyspace).load()
.select(col("provider"), col("idtype"), col("externalid"), col("userid")).persist()
// Get CustodianOrgID
col("locationids"), col("isrootorg"))

val custRootOrgId = getCustodianOrgId()
val custodianUserDF = generateCustodianOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF)
val stateUserDF = generateStateOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF, userOrgDF)

val userLocationResolvedDF = custodianUserDF.unionByName(stateUserDF) // UserLocation
/**
* Get a union of RootOrg and SubOrg information for a User
*/
Expand All @@ -90,34 +86,74 @@ object UserCacheIndexer extends Serializable {

val rootOnlyOrgDF = userRootOrgDF.join(userSubOrgDF, Seq("userid"), "leftanti").select(userRootOrgDF.col("*"))
val userOrgDenormDF = rootOnlyOrgDF.union(userSubOrgDF)

/**
* Resolve organization name for a RootOrg
*/
val resolvedOrgNameDF = userOrgDenormDF
.join(organisationDF, organisationDF.col("id") === userOrgDenormDF.col("rootorgid"), "left_outer")
.groupBy("userid")
.agg(concat_ws(",", collect_set("orgname")).as("orgname"))
.agg(concat_ws(",", collect_set("orgname")).as("orgname_resolved"))
val filteredOrgDf = resolvedOrgNameDF.select(col("userid"), col("orgname_resolved").as("orgname"))

val res2 = time(populateToRedis(filteredOrgDf))
userDF.unpersist();
Console.println("Time taken to insert user org records", res2._1)
}

val userDataDF = userLocationResolvedDF
.join(resolvedOrgNameDF, Seq("userid"), "left")
.withColumn("framework", col("framework_id"))
.drop("framework_id")
def denormUserData(): Unit = {

populateToRedis(userDataDF)
val userDF = filterUserData(spark.read.format("org.apache.spark.sql.cassandra").option("table", "user").option("keyspace", sunbirdKeyspace).load()
.filter(col("userid").isNotNull))
.select("userid", "locationids", "rootorgid","channel").persist(StorageLevel.MEMORY_ONLY)

userOrgDF.unpersist()
organisationDF.unpersist()
locationDF.unpersist()
externalIdentityDF.unpersist()
val userOrgDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "user_org").option("keyspace", sunbirdKeyspace).load().filter(lower(col("isdeleted")) === "false")
.select(col("userid"), col("organisationid")).persist(StorageLevel.MEMORY_ONLY)

val organisationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "organisation").option("keyspace", sunbirdKeyspace).load()
.select(col("id"), col("orgname"), col("channel"), col("orgcode"), col("locationids"), col("isrootorg")).persist(StorageLevel.MEMORY_ONLY)

val locationDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "location").option("keyspace", sunbirdKeyspace).load().persist(StorageLevel.MEMORY_ONLY)

val externalIdentityDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "usr_external_identity").option("keyspace", sunbirdKeyspace).load()
.select(col("provider"), col("idtype"), col("externalid"), col("userid"))

// Get CustodianOrgID
val custRootOrgId = getCustodianOrgId()
Console.println("#### custRootOrgId ####", custRootOrgId)

val custodianUserDF = generateCustodianOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF)

val filteredCustoDian = custodianUserDF.select(
col("declared-school-name").as("schoolname"),
col("declared-ext-id").as("externalid"),
col("state_name").as("state"),
col("district"), col("block"),
col("declared-school-udise-code").as("schooludisecode"),
col("user_channel").as("userchannel"), col("userid"), col("usersignintype"))

val res1 = time(populateToRedis(filteredCustoDian.distinct()))
Console.println("Time taken to insert custodian details", res1._1)

val stateUserDF = generateStateOrgUserData(custRootOrgId, userDF, organisationDF, locationDF, externalIdentityDF, userOrgDF)

val filteredStateDF = stateUserDF.select(
col("declared-school-name").as("schoolname"),
col("declared-ext-id").as("externalid"),
col("state_name").as("state"),
col("district"), col("block"),
col("declared-school-udise-code").as("schooludisecode"),
col("user_channel").as("userchannel"), col("userid"), col("usersignintype"))

val res2 = time(populateToRedis(filteredStateDF.distinct()))
Console.println("Time taken to insert state user details", res2._1)
userDF.unpersist()
}

}

def getCustodianOrgId(): String = {
val systemSettingDF = spark.read.format("org.apache.spark.sql.cassandra").option("table", "system_settings").option("keyspace", sunbirdKeyspace).load()
val df = systemSettingDF.where(col("id") === "custodianOrgId" && col("field") === "custodianOrgId")
.select(col("value")).persist()
val df = systemSettingDF.where(col("id") === "custodianOrgId" && col("field") === "custodianOrgId").select(col("value"))
df.select("value").first().getString(0)
}

Expand All @@ -132,7 +168,6 @@ object UserCacheIndexer extends Serializable {
.withColumn("exploded_location", explode_outer(col("locationids")))
.select(col("userid"), col("exploded_location"), col("locationids"))


val userStateDF = userExplodedLocationDF
.join(locationDF, col("exploded_location") === locationDF.col("id") && locationDF.col("type") === "state")
.select(userExplodedLocationDF.col("userid"), col("name").as("state_name"))
Expand All @@ -153,11 +188,12 @@ object UserCacheIndexer extends Serializable {
.join(userStateDF, Seq("userid"), "inner")
.join(userDistrictDF, Seq("userid"), "left")
.join(userBlockDF, Seq("userid"), "left")
.select(userDF.col("*"),
col("state_name").as("state"),
.select(
userDF.col("*"),
col("state_name"),
col("district"),
col("block"))
.withColumn("usersignintype", lit("Self-Signed-In"))
.withColumn("usersignintype", lit("Self-Signed-In"))
// .drop(col("locationids"))

val custodianUserPivotDF = custodianOrguserLocationDF
Expand All @@ -167,20 +203,16 @@ object UserCacheIndexer extends Serializable {
.groupBy(custodianOrguserLocationDF.col("userid"), organisationDF.col("id"))
.pivot("idtype", Seq("declared-ext-id", "declared-school-name", "declared-school-udise-code"))
.agg(first(col("externalid")))
.select(custodianOrguserLocationDF.col("userid"),
.select(
custodianOrguserLocationDF.col("userid"),
col("declared-ext-id"),
col("declared-school-name"),
col("declared-school-udise-code"),
organisationDF.col("id").as("userchannel"))
organisationDF.col("id").as("user_channel"))

val custodianUserDF = custodianOrguserLocationDF.as("userLocDF")
.join(custodianUserPivotDF, Seq("userid"), "left")
.select(col("userLocDF.*"),
col("declared-ext-id"),
col("declared-school-name").as("schoolname"),
col("declared-school-udise-code").as("schooludisecode"),
col("userchannel"))

.select("userLocDF.*", "declared-ext-id", "declared-school-name", "declared-school-udise-code", "user_channel")
custodianUserDF
}

Expand Down Expand Up @@ -218,10 +250,11 @@ object UserCacheIndexer extends Serializable {

val stateUserLocationResolvedDF = userDF.filter(col("rootorgid") =!= lit(custRootOrgId))
.join(subOrgDF, Seq("userid"), "left")
.select(userDF.col("*"),
subOrgDF.col("orgname").as("schoolname"),
subOrgDF.col("orgcode").as("schooludisecode"),
subOrgDF.col("state_name").as("state"),
.select(
userDF.col("*"),
subOrgDF.col("orgname").as("declared-school-name"),
subOrgDF.col("orgcode").as("declared-school-udise-code"),
subOrgDF.col("state_name"),
subOrgDF.col("district"),
subOrgDF.col("block"))
.withColumn("usersignintype", lit("Validated"))
Expand All @@ -231,12 +264,7 @@ object UserCacheIndexer extends Serializable {
.join(externalIdentityDF, externalIdentityDF.col("idtype") === col("state_user.channel")
&& externalIdentityDF.col("provider") === col("state_user.channel")
&& externalIdentityDF.col("userid") === col("state_user.userid"), "left")
.select(
col("state_user.*"),
externalIdentityDF.col("externalid").as("declared-ext-id"),
col("rootorgid").as("userchannel")
)

.select(col("state_user.*"), externalIdentityDF.col("externalid").as("declared-ext-id"), col("rootorgid").as("user_channel"))
stateUserDF
}

Expand All @@ -246,8 +274,7 @@ object UserCacheIndexer extends Serializable {
val complexFields = schema.fields.filter(field => complexFieldTypes.contains(field.dataType.typeName))

val resultDF = complexFields.foldLeft(filteredDF)((df, field) =>
df.withColumn(field.name, to_json(col(field.name)))
)
df.withColumn(field.name, to_json(col(field.name))))

resultDF.write
.format("org.apache.spark.sql.redis")
Expand All @@ -257,8 +284,18 @@ object UserCacheIndexer extends Serializable {
.save()
}

denormUserData()
val res1 = time(populateUserData())
val res2 = time(denormUserData())
val totalTimeTaken = (res1._1 + res2._1).toDouble/1000
Console.println("Time taken for individual steps:", "stage1", res1._1, "stage2", res2._1)
Console.println("Time taken for complete script:", totalTimeTaken);
}

def time[R](block: => R): (Long, R) = {
val t0 = System.currentTimeMillis()
val result = block // call-by-name
val t1 = System.currentTimeMillis()
((t1 - t0), result)
}

}

0 comments on commit 45d4855

Please sign in to comment.