Skip to content
Closed
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5ae1df9
Added exception metric (as string-guage), to be emitted using the sna…
rmatharu May 10, 2018
5a3ef7b
Adding ListGauge and integration with SamzaContainerMetric
rmatharu May 30, 2018
75ef5d2
Typifying ListGauge to ListGauge<T>, adding an eviction policy, defau…
rmatharu May 31, 2018
1efed5a
Adding metric type ListGauge
rmatharu May 10, 2018
d2922c5
Merge branch 'listgauge' of https://github.com/rayman7718/samza into …
rmatharu May 31, 2018
6602fa0
Adding DiagnosticsAppender, Populating ListGauge exception metric (sa…
rmatharu Jun 1, 2018
054d507
Adding DiagnosticsExceptionEventEvictionPolicy to evict stale events …
rmatharu Jun 2, 2018
1f08eae
Removing remove() and policy-configurability from ListGauge, exposing…
rmatharu Jun 4, 2018
b598b92
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 4, 2018
6f19ba0
Renaming valueList to elements, and getValue to getValues
rmatharu Jun 4, 2018
c22e252
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 4, 2018
4f08cd1
Removing the interface for ListGaugeEvictionPolicy
rmatharu Jun 5, 2018
9f98c57
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 5, 2018
906afeb
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
bdc85d4
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
5c1ed0a
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
c1884e4
Merge branch 'listgauge' of https://github.com/rayman7718/samza into …
rmatharu Jun 5, 2018
b0695b4
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 5, 2018
508b6b3
Renaming diagnostics.appender.enable to samza.diagnostics.enabled
rmatharu Jun 5, 2018
c7821ba
Removing periodic pruning from Listgauge, pruning on add and get in L…
rmatharu Jun 5, 2018
9ba8e5b
Updating testListGauge for new listgauge
rmatharu Jun 5, 2018
64b1cb8
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 6, 2018
fd77c53
Complete removing evictionpolicy separation, merging all eviction log…
rmatharu Jun 6, 2018
cc4d881
Reverting superflous comment changes, indentation changes
rmatharu Jun 8, 2018
172ab20
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 8, 2018
863d850
Minor changes
rmatharu Jun 9, 2018
cac4ecb
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 9, 2018
925473c
Adding throwable cause messages and class names to exception event
rmatharu Jun 9, 2018
2ad0264
Fixing minor bug around diagnostic-event-initialization, adding blank…
rmatharu Jun 13, 2018
60f281c
Merge branch 'master' of https://github.com/apache/samza into diagnos…
rmatharu Jun 21, 2018
cd58669
Removing thread name
rmatharu Jun 21, 2018
8a66d16
Adding throwableInformation to DiagnosticsExceptionEvent, removing ot…
rmatharu Jun 22, 2018
48cbf3e
Moving diagnostics.enabled to jobconfig, adding appender-add check, m…
rmatharu Jun 29, 2018
346affd
Adding MDCMap to DiagnosticExceptionEvent for storing context
rmatharu Jul 10, 2018
7f7885b
Adding default constructor to enable serializing and deserializing
rmatharu Jul 11, 2018
92a29b4
Adding SnapshotSerdeV2 to enable serializing of ExceptionEvents for d…
rmatharu Jul 11, 2018
e9f4004
Serializing exceptions as throwables, storing classtype separately.
rmatharu Jul 14, 2018
28c4fec
Minor changes to ensure build
rmatharu Jul 14, 2018
edbd200
Fixing dependencies in Metrics.scala
rmatharu Jul 23, 2018
f8ace28
Moving addAppender to DiagnosticsAppender, using config and reflectio…
rmatharu Jul 24, 2018
a4d11b0
efactoring useless code out
rmatharu Jul 24, 2018
6e274e7
Making appender-attachment thread safe, adding error logging for appe…
rmatharu Jul 26, 2018
7c727f7
Adding constant to store class, adding class name to log message
rmatharu Jul 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ object JobConfig {
// across application restarts
val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"

// Enables diagnostic appender for logging exception events
val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"

implicit def Config2Job(config: Config) = new JobConfig(config)

/**
Expand Down Expand Up @@ -186,4 +189,6 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)

def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)

def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ import org.apache.samza.task._
import org.apache.samza.util.Util
import org.apache.samza.util._
import org.apache.samza.{SamzaContainerStatus, SamzaException}
import org.apache.samza.diagnostics.DiagnosticsAppender;
import org.apache.log4j.Logger

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using log4j directly in samza-core. We always use slf4j libraries in all our source code, except for the samza-log4j module.


import scala.collection.JavaConverters._

Expand Down Expand Up @@ -757,6 +759,7 @@ class SamzaContainer(
jmxServer = new JmxServer()

startMetrics
startDiagnostics
startAdmins
startOffsetManager
startLocalityManager
Expand Down Expand Up @@ -904,6 +907,17 @@ class SamzaContainer(
})
}

def startDiagnostics {
if (containerContext.config.getDiagnosticsEnabled) {
val rootLogger = Logger.getRootLogger

if (rootLogger.getAppender(classOf[DiagnosticsAppender].getName) == null) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that in log4j2, the API to get appender dynamically also changed: https://logging.apache.org/log4j/2.x/log4j-core/apidocs/org/apache/logging/log4j/core/config/Configuration.html#getAppender(java.lang.String). Please check w/ @PawasChhokra to see what's the best way to make sure the code here is log4j version agnostic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this condition be (rootLogger.getAppender(classOf[DiagnosticsAppender].getName) != null)?

info("Starting diagnostics appender.")
rootLogger.addAppender(new DiagnosticsAppender(this.metrics))
}
}
}

def startOffsetManager {
info("Registering task instances with offsets.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.samza.container

import java.util

import org.apache.samza.diagnostics.DiagnosticsExceptionEvent
import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, MetricsRegistryMap, MetricsHelper}

class SamzaContainerMetrics(
Expand Down Expand Up @@ -48,7 +49,7 @@ class SamzaContainerMetrics(

val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()

val exceptions = newListGauge[String]("exceptions")
val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")

def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.diagnostics;

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.metrics.ListGauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Provides an in-memory appender that parses LoggingEvents to filter events relevant to diagnostics.
* Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
* {@link SamzaContainerMetrics}.
*
* When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
* stream of diagnostics-related events.
*/
public class DiagnosticsAppender extends AppenderSkeleton {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Do we need to have separate implementation for samza jobs using log4j2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same implementation can be adopted, as of log4j2.4.


private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsAppender.class);
private final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: how is this metric serialized and sent to diagnostic topic? If yes, shouldn't DiagnosticsExceptionEvent be serializable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing StreamAppender implementation uses LoggingEventJsonSerde to serialize the loggingEvent. Would be better to use the same if it satisfies all the requirement here(or add fields ContainerContext and operatorContext if we need it).


public DiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
this.setName(DiagnosticsAppender.class.getName());
}

@Override
protected void append(LoggingEvent loggingEvent) {

try {
// if an event with a non-null throwable is received => exception event
if (loggingEvent.getThrowableInformation() != null) {
DiagnosticsExceptionEvent diagnosticsExceptionEvent =
new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation().getThrowable(),
loggingEvent.getProperties());

samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
} else {
LOG.debug("Received non-exception event with message " + loggingEvent.getMessage());
}
} catch (Exception e) {
// blanket catch of all exceptions so as to not impact any job
LOG.error("Exception in logging event parsing", e);
}
}

@Override
public void close() {
// Do nothing.
}

/**
* Returns false since this appender requires no layout.
* @return false
*/
@Override
public boolean requiresLayout() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.diagnostics;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


/**
* This class encapsulates information related to an exception event that is useful for diagnostics.
* It used to define container, task, and other metrics as
* {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}.
*/
public class DiagnosticsExceptionEvent {

private long timestamp; // the timestamp associated with this exception
private Class exceptionType; // store the exception type separately
private Throwable throwable;
private Map mdcMap;
// the MDC map associated with this exception, used to store/obtain any context associated with the throwable

public DiagnosticsExceptionEvent() {
}

public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, Map mdcMap) {
this.throwable = throwable;
this.exceptionType = throwable.getClass();
this.timestamp = timestampMillis;
this.mdcMap = new HashMap(mdcMap);
}

public long getTimestamp() {
return timestamp;
}

public Throwable getThrowable() {
return this.throwable;
}

public Class getExceptionType() {
return this.exceptionType;
}

public Map getMdcMap() {
return mdcMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o;

// Throwable provides no equals impl, so we assume message & stacktrace equality suffices
return timestamp == that.timestamp && this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap)
&& this.throwable.getMessage().equals(that.throwable.getMessage()) && Arrays.equals(
this.throwable.getStackTrace(), that.throwable.getStackTrace());
}

@Override
public int hashCode() {
return Objects.hash(timestamp, exceptionType, throwable, mdcMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.samza.metrics.reporter

import java.util

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be redundant, if we remove the prefix to HashMap in line 59.

import java.util.Collections
import java.util.HashMap
import java.util.Map

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace change should be avoided.

import scala.collection.JavaConverters._

object Metrics {
Expand Down Expand Up @@ -52,4 +54,8 @@ class Metrics(metrics: Map[String, Map[String, Object]]) {
def get(group: String) = immutableMetrics.get(group)

def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)

def this() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a code search and couldn't find any reference to this default constructor. What's the reason to add it?

this(new util.HashMap[String, Map[String, Object]]())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.serializers;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MetricsSnapshotSerdeV2 implements Serde<MetricsSnapshot> {

private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class);
private final ObjectMapper objectMapper;

public MetricsSnapshotSerdeV2() {
objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE);
}

@Override
public MetricsSnapshot fromBytes(byte[] bytes) {
try {
return MetricsSnapshot.fromMap(
objectMapper.readValue(bytes, new HashMap<String, Map<String, Object>>().getClass()));
} catch (IOException e) {
LOG.info("Exception while deserializing", e);
}
return null;
}

@Override
public byte[] toBytes(MetricsSnapshot metricsSnapshot) {
try {
return objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8");
} catch (IOException e) {
LOG.info("Exception while serializing", e);
}
return null;
}

/** Metrics returns an UnmodifiableMap.
* Unmodifiable maps should not be serialized with type, because UnmodifiableMap cannot be deserialized.
* So we convert to HashMap.
*/
private HashMap convertMap(Map<String, Object> map) {
HashMap retVal = new HashMap(map);
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getValue() instanceof Map) {
retVal.put(entry.getKey(), convertMap((Map<String, Object>) entry.getValue()));
}
}
return retVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.serializers;

import org.apache.samza.config.Config;
import org.apache.samza.metrics.reporter.MetricsSnapshot;


public class MetricsSnapshotSerdeV2Factory implements SerdeFactory<MetricsSnapshot> {
@Override
public Serde<MetricsSnapshot> getSerde(String name, Config config) {
return new MetricsSnapshotSerdeV2();
}
}
Loading