diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala index 2c3a2459..02fac8ce 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala @@ -13,10 +13,13 @@ import org.apache.logging.log4j.core.layout.PatternLayout import java.nio.charset.Charset import org.apache.logging.log4j.core.config.AppenderRef +import org.ekstep.analytics.framework.dispatcher.KafkaDispatcher import org.joda.time.DateTime object JobLogger { + implicit val fc = new FrameworkContext(); + def init(jobName: String) = { System.setProperty("logFilename", jobName.toLowerCase()); val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]; @@ -29,31 +32,35 @@ object JobLogger { } private def info(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).info(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid))); + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, INFO) } private def debug(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid))) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)) + logger(name).debug(event); } private def error(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).error(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid))); + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, ERROR) } private def warn(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid))) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)) + logger(name).debug(event); } def start(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_START", "INFO", msg, data, None, pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); - logger(name).info(event); + logEvent(event, name, INFO) } def end(msg: String, status: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_END", "INFO", msg, data, Option(status), pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); - logger(name).info(event); + logEvent(event, name, INFO) } def log(msg: String, data: Option[AnyRef] = None, logLevel: Level = DEBUG, name: String = "org.ekstep.analytics")(implicit className: String) = { @@ -69,6 +76,26 @@ object JobLogger { } } + def logEvent(event: String, name: String = "org.ekstep.analytics", logLevel: Level = DEBUG) = { + if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log.appender.kafka.enable"), "true")) { + val brokerList = AppConf.getConfig("log.appender.kafka.broker_host") + val topic = AppConf.getConfig("log.appender.kafka.topic") + KafkaDispatcher.dispatch(Array(event), Map("brokerList" -> brokerList, "topic" -> topic)) + } + else { + logLevel match { + case INFO => + logger(name).info(event); + case DEBUG => + logger(name).debug(event); + case WARN => + logger(name).debug(event); + case ERROR => + logger(name).error(event); + } + } + } + private def getV3JobEvent(eid: String, level: String, msg: String, data: Option[AnyRef], status: Option[String] = None, pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String): V3DerivedEvent = { val measures = Map( "class" -> className, diff --git a/analytics-core/src/test/resources/application.conf b/analytics-core/src/test/resources/application.conf index 02fe51e5..b45684b3 100644 --- a/analytics-core/src/test/resources/application.conf +++ b/analytics-core/src/test/resources/application.conf @@ -52,3 +52,8 @@ azure_storage_secret = azure-test-secret aws_storage_key = aws-test-key aws_storage_secret = aws-test-secret + +# Joblog Kafka appender config for cluster execution +log.appender.kafka.enable="false" +log.appender.kafka.broker_host="localhost:9092" +log.appender.kafka.topic="telemetry.log" \ No newline at end of file diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala index a85625d8..81174803 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala @@ -44,6 +44,8 @@ class TestJobLogger extends BaseSpec { JobLogger.log("testing warn method", None, WARN); JobLogger.log("testing error method", None, ERROR); + JobLogger.logEvent("test event method", "org.ekstep.analytics", WARN) + JobLogger.logEvent("test event method", "org.ekstep.analytics", DEBUG) } } \ No newline at end of file