From bc3139674b23ad3601696137a24ccac8ab872291 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 25 Jan 2021 13:07:40 +0530 Subject: [PATCH 1/5] Issue #TG-824 feat: Changes to push job log events to kafka when executed on cluster --- .../analytics/framework/util/JobLogger.scala | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala index 2c3a2459..b8d12ff9 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala @@ -13,10 +13,13 @@ import org.apache.logging.log4j.core.layout.PatternLayout import java.nio.charset.Charset import org.apache.logging.log4j.core.config.AppenderRef +import org.ekstep.analytics.framework.dispatcher.KafkaDispatcher import org.joda.time.DateTime object JobLogger { + implicit val fc = new FrameworkContext(); + def init(jobName: String) = { System.setProperty("logFilename", jobName.toLowerCase()); val ctx = LogManager.getContext(false).asInstanceOf[LoggerContext]; @@ -29,31 +32,41 @@ object JobLogger { } private def info(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { + val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid)) logger(name).info(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid))); + logToKakfa(logEvent) } private def debug(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid))) + val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)) + logger(name).debug(logEvent) +// logToKakfa(logEvent) } private def error(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).error(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid))); + val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid)) + logger(name).error(logEvent); + logToKakfa(logEvent) } private def warn(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - logger(name).debug(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid))) + val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)) + logger(name).debug(logEvent) +// logToKakfa(logEvent) } def start(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_START", "INFO", msg, data, None, pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); logger(name).info(event); + logToKakfa(event) } def end(msg: String, status: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_END", "INFO", msg, data, Option(status), pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); logger(name).info(event); + logToKakfa(event) } def log(msg: String, data: Option[AnyRef] = None, logLevel: Level = DEBUG, name: String = "org.ekstep.analytics")(implicit className: String) = { @@ -69,6 +82,14 @@ object JobLogger { } } + def logToKakfa(event: String) = { + if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log4j.appender.kafka.enable"), "true")) { + val brokerList = AppConf.getConfig("log4j.appender.kafka.broker_host") + val topic = AppConf.getConfig("log4j.appender.kafka.topic") + KafkaDispatcher.dispatch(Array(event), Map("brokerList" -> brokerList, "topic" -> topic)) + } + } + private def getV3JobEvent(eid: String, level: String, msg: String, data: Option[AnyRef], status: Option[String] = None, pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String): V3DerivedEvent = { val measures = Map( "class" -> className, From 71fa9d4844108327d643ddc6e42ca7e6ad1bd790 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Mon, 25 Jan 2021 13:52:16 +0530 Subject: [PATCH 2/5] Issue #TG-824 feat: Changes to push job log events to kafka when executed on cluster --- .../org/ekstep/analytics/framework/util/JobLogger.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala index b8d12ff9..d74f717d 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala @@ -83,9 +83,9 @@ object JobLogger { } def logToKakfa(event: String) = { - if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log4j.appender.kafka.enable"), "true")) { - val brokerList = AppConf.getConfig("log4j.appender.kafka.broker_host") - val topic = AppConf.getConfig("log4j.appender.kafka.topic") + if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log.appender.kafka.enable"), "true")) { + val brokerList = AppConf.getConfig("log.appender.kafka.broker_host") + val topic = AppConf.getConfig("log.appender.kafka.topic") KafkaDispatcher.dispatch(Array(event), Map("brokerList" -> brokerList, "topic" -> topic)) } } From 841b49bc2d9062c126b0b636e59928b7d055db29 Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Wed, 27 Jan 2021 10:42:43 +0530 Subject: [PATCH 3/5] Issue #TG-824 feat: Changes to push job log events to kafka when executed on cluster --- .../analytics/framework/util/JobLogger.scala | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala index d74f717d..43f6bd1d 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala @@ -32,41 +32,35 @@ object JobLogger { } private def info(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid)) - logger(name).info(JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid))); - logToKakfa(logEvent) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "INFO", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, INFO) } private def debug(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)) - logger(name).debug(logEvent) -// logToKakfa(logEvent) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, DEBUG) } private def error(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid)) - logger(name).error(logEvent); - logToKakfa(logEvent) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "ERROR", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, ERROR) } private def warn(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { - val logEvent = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)) - logger(name).debug(logEvent) -// logToKakfa(logEvent) + val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)) + logEvent(event, name, WARN) } def start(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_START", "INFO", msg, data, None, pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); - logger(name).info(event); - logToKakfa(event) + logEvent(event, name, INFO) } def end(msg: String, status: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { val event = JSONUtils.serialize(getV3JobEvent("JOB_END", "INFO", msg, data, Option(status), pdata_id, pdata_pid)); EventBusUtil.dipatchEvent(event); - logger(name).info(event); - logToKakfa(event) + logEvent(event, name, INFO) } def log(msg: String, data: Option[AnyRef] = None, logLevel: Level = DEBUG, name: String = "org.ekstep.analytics")(implicit className: String) = { @@ -82,12 +76,24 @@ object JobLogger { } } - def logToKakfa(event: String) = { + def logEvent(event: String, name: String = "org.ekstep.analytics", logLevel: Level = DEBUG) = { if (StringUtils.equalsIgnoreCase(AppConf.getConfig("log.appender.kafka.enable"), "true")) { val brokerList = AppConf.getConfig("log.appender.kafka.broker_host") val topic = AppConf.getConfig("log.appender.kafka.topic") KafkaDispatcher.dispatch(Array(event), Map("brokerList" -> brokerList, "topic" -> topic)) } + else { + logLevel match { + case INFO => + logger(name).info(event); + case DEBUG => + logger(name).debug(event); + case WARN => + logger(name).debug(event); + case ERROR => + logger(name).error(event); + } + } } private def getV3JobEvent(eid: String, level: String, msg: String, data: Option[AnyRef], status: Option[String] = None, pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String): V3DerivedEvent = { From ee2365f554ac2f615f92b466c433c9457d462cdc Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Wed, 27 Jan 2021 10:55:38 +0530 Subject: [PATCH 4/5] Issue #TG-824 feat: Changes to push job log events to kafka when executed on cluster --- .../scala/org/ekstep/analytics/framework/util/JobLogger.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala index 43f6bd1d..02fac8ce 100644 --- a/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala +++ b/analytics-core/src/main/scala/org/ekstep/analytics/framework/util/JobLogger.scala @@ -38,7 +38,7 @@ object JobLogger { private def debug(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "DEBUG", msg, data, None, pdata_id, pdata_pid)) - logEvent(event, name, DEBUG) + logger(name).debug(event); } private def error(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { @@ -48,7 +48,7 @@ object JobLogger { private def warn(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) { val event = JSONUtils.serialize(getV3JobEvent("JOB_LOG", "WARN", msg, data, None, pdata_id, pdata_pid)) - logEvent(event, name, WARN) + logger(name).debug(event); } def start(msg: String, data: Option[AnyRef] = None, name: String = "org.ekstep.analytics", pdata_id: String = "AnalyticsDataPipeline", pdata_pid: String = JobContext.jobName)(implicit className: String) = { From bf5e8f06c6702bbb7dc0428bb794349068f07dbd Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Wed, 27 Jan 2021 12:15:31 +0530 Subject: [PATCH 5/5] Issue #TG-824 feat: Changes to push job log events to kafka when executed on cluster- test cases --- analytics-core/src/test/resources/application.conf | 5 +++++ .../org/ekstep/analytics/framework/util/TestJobLogger.scala | 2 ++ 2 files changed, 7 insertions(+) diff --git a/analytics-core/src/test/resources/application.conf b/analytics-core/src/test/resources/application.conf index 02fe51e5..b45684b3 100644 --- a/analytics-core/src/test/resources/application.conf +++ b/analytics-core/src/test/resources/application.conf @@ -52,3 +52,8 @@ azure_storage_secret = azure-test-secret aws_storage_key = aws-test-key aws_storage_secret = aws-test-secret + +# Joblog Kafka appender config for cluster execution +log.appender.kafka.enable="false" +log.appender.kafka.broker_host="localhost:9092" +log.appender.kafka.topic="telemetry.log" \ No newline at end of file diff --git a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala index a85625d8..81174803 100644 --- a/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala +++ b/analytics-core/src/test/scala/org/ekstep/analytics/framework/util/TestJobLogger.scala @@ -44,6 +44,8 @@ class TestJobLogger extends BaseSpec { JobLogger.log("testing warn method", None, WARN); JobLogger.log("testing error method", None, ERROR); + JobLogger.logEvent("test event method", "org.ekstep.analytics", WARN) + JobLogger.logEvent("test event method", "org.ekstep.analytics", DEBUG) } } \ No newline at end of file