Skip to content

Conversation

@rayman7718
Copy link
Contributor

This PR shows how the ListGauge can be used to emit exceptions using a DiagnosticsAppender.

  1. DiagnosticsAppender is enabled using a config (diagnostics.appender.enable)
  2. DiagnosticsAppender adds exception-events to a listgauge which is a samza container metric
  3. This ListGauge uses a time-and-count based eviction policy, so that exception-events are not emitted to Kafka(SnapshotReporter) forever.

rmatharu added 7 commits May 31, 2018 15:44
…pshot reporter.

Switched producer-shutdown logic to ensure metric-flush at shutdown-time
Added exception metric (as string-guage), to be emitted using the snapshot reporter.
Switched producer-shutdown logic to ensure metric-flush at shutdown-time

Adding ListGauge and integration with SamzaContainerMetric

Typifying ListGauge to ListGauge<T>, adding an eviction policy, default policy to retain last N
…mza container metric) using DiagnosticsAppender
Copy link
Contributor

@vjagadish1989 vjagadish1989 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be helpful to wrap up the original ListGauge PR first so that this is simpler to compare and review.


def startDiagnostics {
// TODO: where should this reside, MetricConfig? Log4jSystemConfig? or a new separate config?
val DIAGNOSTICS_APPENDER_ENABLE = "diagnostics.appender.enable"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/diagnostics.appender.enable/samza.diagnostics.enabled

return timestamp;
}

public void setTimestamp(long timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably unnecessary to expose setters

asfgit pushed a commit that referenced this pull request Jun 14, 2018
This PR introduces a ListGauge type,
A subsequent PR: #543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter.

Author: Ray Matharu <[email protected]>

Reviewers: Jagadish <[email protected]>, Cameron Lee <[email protected]>

Closes #541 from rayman7718/listgauge
weisong44 pushed a commit to weisong44/samza that referenced this pull request Jun 22, 2018
This PR introduces a ListGauge type,
A subsequent PR: apache#543 shows how this issued in conjunction with a diagnostics appender for error-tracking and dumpting to kafka via SnapshotReporter.

Author: Ray Matharu <[email protected]>

Reviewers: Jagadish <[email protected]>, Cameron Lee <[email protected]>

Closes apache#541 from rayman7718/listgauge
Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fist pass, have a few questions. Will discuss face-to-face tomorrow.

}

def startDiagnostics {
// TODO: where should this reside, MetricConfig? Log4jSystemConfig? or a new separate config?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: is this a per job configuration? If yes, it would probably be JobConfig and should be job.diagnostics.enabled.

*/
public class DiagnosticsAppender extends AppenderSkeleton {

private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been using the code convention like: logger = LoggerFactory.getLog(DiagnosticsAppender.class.getName()).

// if an event with a non-null throwable is received => exception event
if (loggingEvent.getThrowableInformation() != null) {
DiagnosticsExceptionEvent diagnosticsExceptionEvent =
new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, how do we get the context info regarding to the exception? Through the stack trace? How do we add the logic component context like sub-query, task instance, container, and thread context?

public class DiagnosticsAppender extends AppenderSkeleton {

private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: how is this metric serialized and sent to diagnostic topic? If yes, shouldn't DiagnosticsExceptionEvent be serializable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing StreamAppender implementation uses LoggingEventJsonSerde to serialize the loggingEvent. Would be better to use the same if it satisfies all the requirement here(or add fields ContainerContext and operatorContext if we need it).

*/
package org.apache.samza.diagnostics;

import org.apache.log4j.spi.ThrowableInformation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better not to depend on log4j specific objects in your model. If we choose to update to log4j2 in samza or switch to some other logging library(logback2), then this has to be changed(along with Appender implementation).

// TODO: where should this reside, MetricConfig? Log4jSystemConfig? or a new separate config?
val DIAGNOSTICS_APPENDER_ENABLE = "samza.diagnostics.enabled"
if (containerContext.config.getBoolean(DIAGNOSTICS_APPENDER_ENABLE, false)) {
import org.apache.log4j.Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Better to move the imports to the top.

*/
public class DiagnosticsAppender extends AppenderSkeleton {

private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good practice to make the logger variable static and share it across all instances of the class.

* When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
* stream of diagnostics-related events.
*/
public class DiagnosticsAppender extends AppenderSkeleton {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Do we need to have separate implementation for samza jobs using log4j2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same implementation can be adopted, as of log4j2.4.

public class DiagnosticsAppender extends AppenderSkeleton {

private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing StreamAppender implementation uses LoggingEventJsonSerde to serialize the loggingEvent. Would be better to use the same if it satisfies all the requirement here(or add fields ContainerContext and operatorContext if we need it).


info("Starting Diagnostics Appender.")
val diagnosticsAppender = new DiagnosticsAppender(this.metrics)
rootLogger.addAppender(diagnosticsAppender)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In standalone, there're plans to run more than one StreamProcessor in a jvm(which translates to more than one SamzaContainer thread in jvm).
So it will be better to add diagnosticsAppender to rootLogger if it is not already added.

@rayman7718
Copy link
Contributor Author

rayman7718 commented Jul 10, 2018

Made the following changes:

  • Depend directly on using java objects rather than log4j objects in appender (done)

  • Check if logger already attached, only than attach appender to root logger (done)

  • JobConfig for samza.diagnostics.enabled (done)

  • SnapshotSerdeV2 to serialize Throwables with type and other relevant info for error tracking (done), with unit test.

  • Obtain/store an exception's context in appender

  1. Container-level context is already present in MetricsHeaders.

  2. Adding TaskContext to SamzaException didnt quite work because TaskContext doesn't apply to many components (like KafkaSysCons).

  3. Instead, we use the MDC map associated with the exception. Context can then be added by the components to MDC map, e.g., topicName, espressoTableName, etc. LOg4j's LoggingEvent takes care of multi-threading and async-appender.

  4. In cases, such as KafkaSysConsumer the poll is for a set of SSPs and the exception message (with the erroring topic name) is populated by KafkaConsumer (which Samza has no control over).

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayman7718 thanks for the update! I have a few comments regarding the usage of log4j in Samza code base. Please check. Thanks!

import org.apache.samza.util._
import org.apache.samza.{SamzaContainerStatus, SamzaException}
import org.apache.samza.diagnostics.DiagnosticsAppender;
import org.apache.log4j.Logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using log4j directly in samza-core. We always use slf4j libraries in all our source code, except for the samza-log4j module.

if (containerContext.config.getDiagnosticsEnabled) {
val rootLogger = Logger.getRootLogger

if (rootLogger.getAppender(classOf[DiagnosticsAppender].getName) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that in log4j2, the API to get appender dynamically also changed: https://logging.apache.org/log4j/2.x/log4j-core/apidocs/org/apache/logging/log4j/core/config/Configuration.html#getAppender(java.lang.String). Please check w/ @PawasChhokra to see what's the best way to make sure the code here is log4j version agnostic.

if (containerContext.config.getDiagnosticsEnabled) {
val rootLogger = Logger.getRootLogger

if (rootLogger.getAppender(classOf[DiagnosticsAppender].getName) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this condition be (rootLogger.getAppender(classOf[DiagnosticsAppender].getName) != null)?

import java.util.Collections
import java.util.HashMap
import java.util.Map

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace change should be avoided.


def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)

def this() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a code search and couldn't find any reference to this default constructor. What's the reason to add it?


package org.apache.samza.metrics.reporter

import java.util
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be redundant, if we remove the prefix to HashMap in line 59.

@rayman7718
Copy link
Contributor Author

  • Addressed the log4j dependency issue by using a config for the Appender class, this removes the use of log4j Logger from SamzaContainer

  • For log4j2, verified with Pawas that the getappender and addAppender apis are consistent.

  • Addressed the other Metrics comments.

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rayman7718 thanks a lot for the updates. Still have a few minor comments, but looks good overall.


def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }

def getDiagnosticsAppenderClass = { getOption(JobConfig.DIAGNOSTICS_APPENDER_CLASS) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it would make sense to provide a default class name here as well, for log4j 1.2.x (since in open source, we still depend on log4j 1.2.x).

def startDiagnostics {
if (containerContext.config.getDiagnosticsEnabled) {
info("Starting diagnostics.")
val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass.get).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to capture the class loading exceptions and log a line here, then throw them as ConfigException(msg, e).

this.setName(SimpleDiagnosticsAppender.class.getName());

// ensure appender is attached only once per JVM (regardless of #containers)
if (org.apache.log4j.Logger.getRootLogger().getAppender(SimpleDiagnosticsAppender.class.getName()) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realize: is this code section safe for multi-threaded environment? Ideally, we should make it threadsafe. If not, we better make 100% sure that the loading of this appender only happens in one single thread and make the usage documentation here very clear.

…nder instantiation, adding default value in jobConfig
@rayman7718
Copy link
Contributor Author

  • Added simple synchronization to make appender attachment thread safe (from constructor).
  • Added default value for appender class path in job config.
  • Adding error logging in samza-container for appender instantiation.

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fixes! Feel free to check after fix the minor comments.

def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }

def getDiagnosticsAppenderClass = {
getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: define a static constant for the default value in the class.

catch {
case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
error("Failed to instantiate diagnostic appender", e)
throw new ConfigException("Failed to instantiate diagnostic appender class specified in config", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add the configured DiagnosticsAppenderClass name in the log and exception would make it easier for debugging.

@rayman7718 rayman7718 changed the title Populating ListGauge metric using DiagnosticsAppender for exceptions JIRA: SAMZA-1733 Populating ListGauge metric using DiagnosticsAppender for exceptions Jul 26, 2018
@asfgit asfgit closed this in e0ff4c5 Jul 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants