Skip to content

Commit

Permalink
Merge pull request #75 from sowmya-dixit/release-3.6.0
Browse files Browse the repository at this point in the history
Changes to push job log events to kafka when executed on cluster
  • Loading branch information
anandp504 authored Jan 28, 2021
2 parents b17e0e7 + bf5e8f0 commit 2243661
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import org.apache.logging.log4j.core.layout.PatternLayout
import java.nio.charset.Charset

import org.apache.logging.log4j.core.config.AppenderRef
import org.ekstep.analytics.framework.dispatcher.KafkaDispatcher
import org.joda.time.DateTime

object JobLogger {

implicit val fc = new FrameworkContext();

def init(jobName: String) = {
System.setProperty("logFilename", jobName.toLowerCase());
val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext];
Expand All @@ -29,31 +32,35 @@ object JobLogger {
}

private def info(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) {
logger(name).info(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid)));
val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid))
logEvent(event, name, INFO)
}

private def debug(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) {
logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)))
val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid))
logger(name).debug(event);
}

private def error(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) {
logger(name).error(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid)));
val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid))
logEvent(event, name, ERROR)
}

private def warn(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) {
logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)))
val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid))
logger(name).debug(event);
}

def start(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = {
val event = JSONUtils.serialize(getV3JobEvent("JOB_START", "INFO", msg, data, None, pdata_id, pdata_pid));
EventBusUtil.dipatchEvent(event);
logger(name).info(event);
logEvent(event, name, INFO)
}

def end(msg: String, status: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = {
val event = JSONUtils.serialize(getV3JobEvent("JOB_END", "INFO", msg, data, Option(status), pdata_id, pdata_pid));
EventBusUtil.dipatchEvent(event);
logger(name).info(event);
logEvent(event, name, INFO)
}

def log(msg: String, data: Option[AnyRef] = None, logLevel: Level = DEBUG, name: String = "org.ekstep.analytics")(implicit className: String) = {
Expand All @@ -69,6 +76,26 @@ object JobLogger {
}
}

def logEvent(event: String, name: String = "org.ekstep.analytics", logLevel: Level = DEBUG) = {
if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log.appender.kafka.enable"), "true")) {
val brokerList = AppConf.getConfig("log.appender.kafka.broker_host")
val topic = AppConf.getConfig("log.appender.kafka.topic")
KafkaDispatcher.dispatch(Array(event), Map("brokerList" -> brokerList, "topic" -> topic))
}
else {
logLevel match {
case INFO =>
logger(name).info(event);
case DEBUG =>
logger(name).debug(event);
case WARN =>
logger(name).debug(event);
case ERROR =>
logger(name).error(event);
}
}
}

private def getV3JobEvent(eid: String, level: String, msg: String, data: Option[AnyRef], status: Option[String] = None, pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String): V3DerivedEvent = {
val measures = Map(
"class" -> className,
Expand Down
5 changes: 5 additions & 0 deletions analytics-core/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ azure_storage_secret = azure-test-secret

aws_storage_key = aws-test-key
aws_storage_secret = aws-test-secret

# Joblog Kafka appender config for cluster execution
log.appender.kafka.enable="false"
log.appender.kafka.broker_host="localhost:9092"
log.appender.kafka.topic="telemetry.log"
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class TestJobLogger extends BaseSpec {
JobLogger.log("testing warn method", None, WARN);
JobLogger.log("testing error method", None, ERROR);

JobLogger.logEvent("test event method", "org.ekstep.analytics", WARN)
JobLogger.logEvent("test event method", "org.ekstep.analytics", DEBUG)
}

}

0 comments on commit 2243661

Please sign in to comment.