diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index 75e8005eae..7cebcc6424 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -88,6 +88,13 @@ object JobConfig { // across application restarts val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir" + // Enables diagnostic appender for logging exception events + val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled" + + // Specify DiagnosticAppender class + val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class" + val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender" + implicit def Config2Job(config: Config) = new JobConfig(config) /** @@ -186,4 +193,10 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR) def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR) + + def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) } + + def getDiagnosticsAppenderClass = { + getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS) + } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index be0fb26d06..ca3377a746 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -21,6 +21,7 @@ package org.apache.samza.container import java.io.File import java.lang.management.ManagementFactory +import java.lang.reflect.InvocationTargetException import java.net.{URL, UnknownHostException} import java.nio.file.Path import java.time.Duration @@ -30,7 +31,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics @@ -53,8 +53,7 @@ import org.apache.samza.system._ import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} import org.apache.samza.table.TableManager import org.apache.samza.task._ -import org.apache.samza.util.Util -import org.apache.samza.util._ +import org.apache.samza.util.{Util, _} import org.apache.samza.{SamzaContainerStatus, SamzaException} import scala.collection.JavaConverters._ @@ -757,6 +756,7 @@ class SamzaContainer( jmxServer = new JmxServer() startMetrics + startDiagnostics startAdmins startOffsetManager startLocalityManager @@ -904,6 +904,24 @@ class SamzaContainer( }) } + def startDiagnostics { + if (containerContext.config.getDiagnosticsEnabled) { + info("Starting diagnostics.") + + try { + val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass). + getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics); + } + catch { + case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => { + error("Failed to instantiate diagnostic appender", e) + throw new ConfigException("Failed to instantiate diagnostic appender class " + + containerContext.config.getDiagnosticsAppenderClass, e) + } + } + } + } + def startOffsetManager { info("Registering task instances with offsets.") diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index a26e6669bf..d5cf6c653d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -21,6 +21,7 @@ package org.apache.samza.container import java.util +import org.apache.samza.diagnostics.DiagnosticsExceptionEvent import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, MetricsRegistryMap, MetricsHelper} class SamzaContainerMetrics( @@ -48,7 +49,7 @@ class SamzaContainerMetrics( val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]() - val exceptions = newListGauge[String]("exceptions") + val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions") def addStoreRestorationGauge(taskName: TaskName, storeName: String) { taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java new file mode 100644 index 0000000000..d87249e788 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.diagnostics; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + + +/** + * This class encapsulates information related to an exception event that is useful for diagnostics. + * It used to define container, task, and other metrics as + * {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}. + */ +public class DiagnosticsExceptionEvent { + + private long timestamp; // the timestamp associated with this exception + private Class exceptionType; // store the exception type separately + private Throwable throwable; + private Map mdcMap; + // the MDC map associated with this exception, used to store/obtain any context associated with the throwable + + public DiagnosticsExceptionEvent() { + } + + public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, Map mdcMap) { + this.throwable = throwable; + this.exceptionType = throwable.getClass(); + this.timestamp = timestampMillis; + this.mdcMap = new HashMap(mdcMap); + } + + public long getTimestamp() { + return timestamp; + } + + public Throwable getThrowable() { + return this.throwable; + } + + public Class getExceptionType() { + return this.exceptionType; + } + + public Map getMdcMap() { + return mdcMap; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o; + + // Throwable provides no equals impl, so we assume message & stacktrace equality suffices + return timestamp == that.timestamp && this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap) + && this.throwable.getMessage().equals(that.throwable.getMessage()) && Arrays.equals( + this.throwable.getStackTrace(), that.throwable.getStackTrace()); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, exceptionType, throwable, mdcMap); + } +} \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala index 8a58cd2e80..218157e846 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala @@ -19,9 +19,7 @@ package org.apache.samza.metrics.reporter -import java.util.Collections -import java.util.HashMap -import java.util.Map +import java.util.{Collections, HashMap, Map} import scala.collection.JavaConverters._ object Metrics { @@ -52,4 +50,9 @@ class Metrics(metrics: Map[String, Map[String, Object]]) { def get(group: String) = immutableMetrics.get(group) def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics) + + // default constructor to enable deserialization by MetricsSnapshotSerdeV2 + def this() { + this(new HashMap[String, Map[String, Object]]()) + } } diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java new file mode 100644 index 0000000000..6ab7ce8f4f --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MetricsSnapshotSerdeV2 implements Serde { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class); + private final ObjectMapper objectMapper; + + public MetricsSnapshotSerdeV2() { + objectMapper = new ObjectMapper(); + objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE); + } + + @Override + public MetricsSnapshot fromBytes(byte[] bytes) { + try { + return MetricsSnapshot.fromMap( + objectMapper.readValue(bytes, new HashMap>().getClass())); + } catch (IOException e) { + LOG.info("Exception while deserializing", e); + } + return null; + } + + @Override + public byte[] toBytes(MetricsSnapshot metricsSnapshot) { + try { + return objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8"); + } catch (IOException e) { + LOG.info("Exception while serializing", e); + } + return null; + } + + /** Metrics returns an UnmodifiableMap. + * Unmodifiable maps should not be serialized with type, because UnmodifiableMap cannot be deserialized. + * So we convert to HashMap. + */ + private HashMap convertMap(Map map) { + HashMap retVal = new HashMap(map); + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue() instanceof Map) { + retVal.put(entry.getKey(), convertMap((Map) entry.getValue())); + } + } + return retVal; + } +} \ No newline at end of file diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java new file mode 100644 index 0000000000..49e07705e2 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.reporter.MetricsSnapshot; + + +public class MetricsSnapshotSerdeV2Factory implements SerdeFactory { + @Override + public Serde getSerde(String name, Config config) { + return new MetricsSnapshotSerdeV2(); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java new file mode 100644 index 0000000000..e4255a7acf --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers.model.serializers; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.diagnostics.DiagnosticsExceptionEvent; +import org.apache.samza.metrics.ListGauge; +import org.apache.samza.metrics.reporter.Metrics; +import org.apache.samza.metrics.reporter.MetricsHeader; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.serializers.MetricsSnapshotSerdeV2; +import org.junit.Assert; +import org.junit.Test; + + +public class TestMetricsSnapshotSerdeV2 { + + @Test + public void testSerde() { + MetricsHeader metricsHeader = + new MetricsHeader("jobName", "i001", "container 0", "source", "300.14.25.1", "1", "1", 1, 1); + + ListGauge listGauge = new ListGauge("exceptions"); + DiagnosticsExceptionEvent diagnosticsExceptionEvent1 = + new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")), + new HashMap()); + + listGauge.add(diagnosticsExceptionEvent1); + + String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics"; + Map> metricMessage = new HashMap<>(); + metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>()); + metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues()); + metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0); + + MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage)); + + MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2(); + byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot); + + MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes); + + Assert.assertTrue("Headers map should be equal", + metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap())); + + Assert.assertTrue("Metrics map should be equal", + metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap())); + } +} diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java new file mode 100644 index 0000000000..31f0d47d8b --- /dev/null +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.logging.log4j; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.diagnostics.DiagnosticsExceptionEvent; +import org.apache.samza.metrics.ListGauge; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides an in-memory appender that parses LoggingEvents to filter events relevant to diagnostics. + * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in + * {@link SamzaContainerMetrics}. + * + * When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a + * stream of diagnostics-related events. + */ +public class SimpleDiagnosticsAppender extends AppenderSkeleton { + private static final Logger LOG = LoggerFactory.getLogger(SimpleDiagnosticsAppender.class); + + // simple object to synchronize root logger attachment + private static final Object SYNCHRONIZATION_OBJECT = new Object(); + protected final ListGauge samzaContainerExceptionMetric; + + /** + * A simple log4j1.2.* appender, which attaches itself to the root logger. + * Attachment to the root logger is thread safe. + */ + public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) { + this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions(); + this.setName(SimpleDiagnosticsAppender.class.getName()); + + synchronized (SYNCHRONIZATION_OBJECT) { + this.attachAppenderToRootLogger(); + } + } + + private void attachAppenderToRootLogger() { + // ensure appender is attached only once per JVM (regardless of #containers) + if (org.apache.log4j.Logger.getRootLogger().getAppender(SimpleDiagnosticsAppender.class.getName()) == null) { + LOG.info("Attaching diagnostics appender to root logger"); + org.apache.log4j.Logger.getRootLogger().addAppender(this); + } + } + + @Override + protected void append(LoggingEvent loggingEvent) { + + try { + // if an event with a non-null throwable is received => exception event + if (loggingEvent.getThrowableInformation() != null) { + DiagnosticsExceptionEvent diagnosticsExceptionEvent = + new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation().getThrowable(), + loggingEvent.getProperties()); + + samzaContainerExceptionMetric.add(diagnosticsExceptionEvent); + LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent); + } else { + LOG.debug("Received non-exception event with message " + loggingEvent.getMessage()); + } + } catch (Exception e) { + // blanket catch of all exceptions so as to not impact any job + LOG.error("Exception in logging event parsing", e); + } + } + + @Override + public void close() { + // Do nothing. + } + + /** + * Returns false since this appender requires no layout. + * @return false + */ + @Override + public boolean requiresLayout() { + return false; + } +}