Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5ae1df9
Added exception metric (as string-guage), to be emitted using the sna…
rmatharu May 10, 2018
5a3ef7b
Adding ListGauge and integration with SamzaContainerMetric
rmatharu May 30, 2018
75ef5d2
Typifying ListGauge to ListGauge<T>, adding an eviction policy, defau…
rmatharu May 31, 2018
1efed5a
Adding metric type ListGauge
rmatharu May 10, 2018
d2922c5
Merge branch 'listgauge' of https://github.com/rayman7718/samza into …
rmatharu May 31, 2018
6602fa0
Adding DiagnosticsAppender, Populating ListGauge exception metric (sa…
rmatharu Jun 1, 2018
054d507
Adding DiagnosticsExceptionEventEvictionPolicy to evict stale events …
rmatharu Jun 2, 2018
1f08eae
Removing remove() and policy-configurability from ListGauge, exposing…
rmatharu Jun 4, 2018
b598b92
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 4, 2018
6f19ba0
Renaming valueList to elements, and getValue to getValues
rmatharu Jun 4, 2018
c22e252
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 4, 2018
4f08cd1
Removing the interface for ListGaugeEvictionPolicy
rmatharu Jun 5, 2018
9f98c57
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 5, 2018
906afeb
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
bdc85d4
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
5c1ed0a
Removing ValueInfo class and moving TimestampedValue to samza-api util
rmatharu Jun 5, 2018
c1884e4
Merge branch 'listgauge' of https://github.com/rayman7718/samza into …
rmatharu Jun 5, 2018
b0695b4
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 5, 2018
508b6b3
Renaming diagnostics.appender.enable to samza.diagnostics.enabled
rmatharu Jun 5, 2018
c7821ba
Removing periodic pruning from Listgauge, pruning on add and get in L…
rmatharu Jun 5, 2018
9ba8e5b
Updating testListGauge for new listgauge
rmatharu Jun 5, 2018
64b1cb8
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 6, 2018
fd77c53
Complete removing evictionpolicy separation, merging all eviction log…
rmatharu Jun 6, 2018
cc4d881
Reverting superflous comment changes, indentation changes
rmatharu Jun 8, 2018
172ab20
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 8, 2018
863d850
Minor changes
rmatharu Jun 9, 2018
cac4ecb
Merge branch 'listgauge' into diagnosticsappender
rmatharu Jun 9, 2018
925473c
Adding throwable cause messages and class names to exception event
rmatharu Jun 9, 2018
2ad0264
Fixing minor bug around diagnostic-event-initialization, adding blank…
rmatharu Jun 13, 2018
60f281c
Merge branch 'master' of https://github.com/apache/samza into diagnos…
rmatharu Jun 21, 2018
cd58669
Removing thread name
rmatharu Jun 21, 2018
8a66d16
Adding throwableInformation to DiagnosticsExceptionEvent, removing ot…
rmatharu Jun 22, 2018
48cbf3e
Moving diagnostics.enabled to jobconfig, adding appender-add check, m…
rmatharu Jun 29, 2018
346affd
Adding MDCMap to DiagnosticExceptionEvent for storing context
rmatharu Jul 10, 2018
7f7885b
Adding default constructor to enable serializing and deserializing
rmatharu Jul 11, 2018
92a29b4
Adding SnapshotSerdeV2 to enable serializing of ExceptionEvents for d…
rmatharu Jul 11, 2018
e9f4004
Serializing exceptions as throwables, storing classtype separately.
rmatharu Jul 14, 2018
28c4fec
Minor changes to ensure build
rmatharu Jul 14, 2018
edbd200
Fixing dependencies in Metrics.scala
rmatharu Jul 23, 2018
f8ace28
Moving addAppender to DiagnosticsAppender, using config and reflectio…
rmatharu Jul 24, 2018
a4d11b0
efactoring useless code out
rmatharu Jul 24, 2018
6e274e7
Making appender-attachment thread safe, adding error logging for appe…
rmatharu Jul 26, 2018
7c727f7
Adding constant to store class, adding class name to log message
rmatharu Jul 26, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ object JobConfig {
// across application restarts
val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"

// Enables diagnostic appender for logging exception events
val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"

// Specify DiagnosticAppender class
val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"

implicit def Config2Job(config: Config) = new JobConfig(config)

/**
Expand Down Expand Up @@ -186,4 +193,10 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)

def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)

def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }

def getDiagnosticsAppenderClass = {
getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.samza.container

import java.io.File
import java.lang.management.ManagementFactory
import java.lang.reflect.InvocationTargetException
import java.net.{URL, UnknownHostException}
import java.nio.file.Path
import java.time.Duration
Expand All @@ -30,7 +31,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic

import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.MetricsConfig.Config2Metrics
Expand All @@ -53,8 +53,7 @@ import org.apache.samza.system._
import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.Util
import org.apache.samza.util._
import org.apache.samza.util.{Util, _}
import org.apache.samza.{SamzaContainerStatus, SamzaException}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -757,6 +756,7 @@ class SamzaContainer(
jmxServer = new JmxServer()

startMetrics
startDiagnostics
startAdmins
startOffsetManager
startLocalityManager
Expand Down Expand Up @@ -904,6 +904,24 @@ class SamzaContainer(
})
}

def startDiagnostics {
if (containerContext.config.getDiagnosticsEnabled) {
info("Starting diagnostics.")

try {
val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass).
getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
}
catch {
case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
error("Failed to instantiate diagnostic appender", e)
throw new ConfigException("Failed to instantiate diagnostic appender class " +
containerContext.config.getDiagnosticsAppenderClass, e)
}
}
}
}

def startOffsetManager {
info("Registering task instances with offsets.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.samza.container

import java.util

import org.apache.samza.diagnostics.DiagnosticsExceptionEvent
import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, MetricsRegistryMap, MetricsHelper}

class SamzaContainerMetrics(
Expand Down Expand Up @@ -48,7 +49,7 @@ class SamzaContainerMetrics(

val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()

val exceptions = newListGauge[String]("exceptions")
val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")

def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.diagnostics;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


/**
* This class encapsulates information related to an exception event that is useful for diagnostics.
* It used to define container, task, and other metrics as
* {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}.
*/
public class DiagnosticsExceptionEvent {

private long timestamp; // the timestamp associated with this exception
private Class exceptionType; // store the exception type separately
private Throwable throwable;
private Map mdcMap;
// the MDC map associated with this exception, used to store/obtain any context associated with the throwable

public DiagnosticsExceptionEvent() {
}

public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, Map mdcMap) {
this.throwable = throwable;
this.exceptionType = throwable.getClass();
this.timestamp = timestampMillis;
this.mdcMap = new HashMap(mdcMap);
}

public long getTimestamp() {
return timestamp;
}

public Throwable getThrowable() {
return this.throwable;
}

public Class getExceptionType() {
return this.exceptionType;
}

public Map getMdcMap() {
return mdcMap;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o;

// Throwable provides no equals impl, so we assume message & stacktrace equality suffices
return timestamp == that.timestamp && this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap)
&& this.throwable.getMessage().equals(that.throwable.getMessage()) && Arrays.equals(
this.throwable.getStackTrace(), that.throwable.getStackTrace());
}

@Override
public int hashCode() {
return Objects.hash(timestamp, exceptionType, throwable, mdcMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

package org.apache.samza.metrics.reporter

import java.util.Collections
import java.util.HashMap
import java.util.Map
import java.util.{Collections, HashMap, Map}
import scala.collection.JavaConverters._

object Metrics {
Expand Down Expand Up @@ -52,4 +50,9 @@ class Metrics(metrics: Map[String, Map[String, Object]]) {
def get(group: String) = immutableMetrics.get(group)

def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)

// default constructor to enable deserialization by MetricsSnapshotSerdeV2
def this() {
this(new HashMap[String, Map[String, Object]]())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.serializers;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MetricsSnapshotSerdeV2 implements Serde<MetricsSnapshot> {

private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class);
private final ObjectMapper objectMapper;

public MetricsSnapshotSerdeV2() {
objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE);
}

@Override
public MetricsSnapshot fromBytes(byte[] bytes) {
try {
return MetricsSnapshot.fromMap(
objectMapper.readValue(bytes, new HashMap<String, Map<String, Object>>().getClass()));
} catch (IOException e) {
LOG.info("Exception while deserializing", e);
}
return null;
}

@Override
public byte[] toBytes(MetricsSnapshot metricsSnapshot) {
try {
return objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8");
} catch (IOException e) {
LOG.info("Exception while serializing", e);
}
return null;
}

/** Metrics returns an UnmodifiableMap.
* Unmodifiable maps should not be serialized with type, because UnmodifiableMap cannot be deserialized.
* So we convert to HashMap.
*/
private HashMap convertMap(Map<String, Object> map) {
HashMap retVal = new HashMap(map);
for (Map.Entry<String, Object> entry : map.entrySet()) {
if (entry.getValue() instanceof Map) {
retVal.put(entry.getKey(), convertMap((Map<String, Object>) entry.getValue()));
}
}
return retVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.serializers;

import org.apache.samza.config.Config;
import org.apache.samza.metrics.reporter.MetricsSnapshot;


public class MetricsSnapshotSerdeV2Factory implements SerdeFactory<MetricsSnapshot> {
@Override
public Serde<MetricsSnapshot> getSerde(String name, Config config) {
return new MetricsSnapshotSerdeV2();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.serializers.model.serializers;

import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
import org.apache.samza.metrics.ListGauge;
import org.apache.samza.metrics.reporter.Metrics;
import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.junit.Assert;
import org.junit.Test;


public class TestMetricsSnapshotSerdeV2 {

@Test
public void testSerde() {
MetricsHeader metricsHeader =
new MetricsHeader("jobName", "i001", "container 0", "source", "300.14.25.1", "1", "1", 1, 1);

ListGauge listGauge = new ListGauge<DiagnosticsExceptionEvent>("exceptions");
DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")),
new HashMap());

listGauge.add(diagnosticsExceptionEvent1);

String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics";
Map<String, Map<String, Object>> metricMessage = new HashMap<>();
metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues());
metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);

MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));

MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);

MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);

Assert.assertTrue("Headers map should be equal",
metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap()));

Assert.assertTrue("Metrics map should be equal",
metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap()));
}
}
Loading