-
Notifications
You must be signed in to change notification settings - Fork 331
JIRA: SAMZA-1733 Populating ListGauge metric using DiagnosticsAppender for exceptions #543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 41 commits
5ae1df9
5a3ef7b
75ef5d2
1efed5a
d2922c5
6602fa0
054d507
1f08eae
b598b92
6f19ba0
c22e252
4f08cd1
9f98c57
906afeb
bdc85d4
5c1ed0a
c1884e4
b0695b4
508b6b3
c7821ba
9ba8e5b
64b1cb8
fd77c53
cc4d881
172ab20
863d850
cac4ecb
925473c
2ad0264
60f281c
cd58669
8a66d16
48cbf3e
346affd
7f7885b
92a29b4
e9f4004
28c4fec
edbd200
f8ace28
a4d11b0
6e274e7
7c727f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,6 @@ import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorServic | |
|
|
||
| import com.google.common.annotations.VisibleForTesting | ||
| import com.google.common.util.concurrent.ThreadFactoryBuilder | ||
| import org.apache.commons.lang3.exception.ExceptionUtils | ||
| import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics} | ||
| import org.apache.samza.config.JobConfig.Config2Job | ||
| import org.apache.samza.config.MetricsConfig.Config2Metrics | ||
|
|
@@ -53,8 +52,7 @@ import org.apache.samza.system._ | |
| import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} | ||
| import org.apache.samza.table.TableManager | ||
| import org.apache.samza.task._ | ||
| import org.apache.samza.util.Util | ||
| import org.apache.samza.util._ | ||
| import org.apache.samza.util.{Util, _} | ||
| import org.apache.samza.{SamzaContainerStatus, SamzaException} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
@@ -757,6 +755,7 @@ class SamzaContainer( | |
| jmxServer = new JmxServer() | ||
|
|
||
| startMetrics | ||
| startDiagnostics | ||
| startAdmins | ||
| startOffsetManager | ||
| startLocalityManager | ||
|
|
@@ -904,6 +903,14 @@ class SamzaContainer( | |
| }) | ||
| } | ||
|
|
||
| def startDiagnostics { | ||
| if (containerContext.config.getDiagnosticsEnabled) { | ||
| info("Starting diagnostics.") | ||
| val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass.get). | ||
|
||
| getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics); | ||
| } | ||
| } | ||
|
|
||
| def startOffsetManager { | ||
| info("Registering task instances with offsets.") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.samza.diagnostics; | ||
|
|
||
| import java.util.Arrays; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
|
|
||
| /** | ||
| * This class encapsulates information related to an exception event that is useful for diagnostics. | ||
| * It used to define container, task, and other metrics as | ||
| * {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}. | ||
| */ | ||
| public class DiagnosticsExceptionEvent { | ||
|
|
||
| private long timestamp; // the timestamp associated with this exception | ||
| private Class exceptionType; // store the exception type separately | ||
| private Throwable throwable; | ||
| private Map mdcMap; | ||
| // the MDC map associated with this exception, used to store/obtain any context associated with the throwable | ||
|
|
||
| public DiagnosticsExceptionEvent() { | ||
| } | ||
|
|
||
| public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, Map mdcMap) { | ||
| this.throwable = throwable; | ||
| this.exceptionType = throwable.getClass(); | ||
| this.timestamp = timestampMillis; | ||
| this.mdcMap = new HashMap(mdcMap); | ||
| } | ||
|
|
||
| public long getTimestamp() { | ||
| return timestamp; | ||
| } | ||
|
|
||
| public Throwable getThrowable() { | ||
| return this.throwable; | ||
| } | ||
|
|
||
| public Class getExceptionType() { | ||
| return this.exceptionType; | ||
| } | ||
|
|
||
| public Map getMdcMap() { | ||
| return mdcMap; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (o == null || getClass() != o.getClass()) { | ||
| return false; | ||
| } | ||
| DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o; | ||
|
|
||
| // Throwable provides no equals impl, so we assume message & stacktrace equality suffices | ||
| return timestamp == that.timestamp && this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap) | ||
| && this.throwable.getMessage().equals(that.throwable.getMessage()) && Arrays.equals( | ||
| this.throwable.getStackTrace(), that.throwable.getStackTrace()); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(timestamp, exceptionType, throwable, mdcMap); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,9 +19,7 @@ | |
|
|
||
| package org.apache.samza.metrics.reporter | ||
|
|
||
| import java.util.Collections | ||
| import java.util.HashMap | ||
| import java.util.Map | ||
| import java.util.{Collections, HashMap, Map} | ||
| import scala.collection.JavaConverters._ | ||
|
|
||
| object Metrics { | ||
|
|
@@ -52,4 +50,9 @@ class Metrics(metrics: Map[String, Map[String, Object]]) { | |
| def get(group: String) = immutableMetrics.get(group) | ||
|
|
||
| def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics) | ||
|
|
||
| // default constructor to enable deserialization by MetricsSnapshotSerdeV2 | ||
| def this() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did a code search and couldn't find any reference to this default constructor. What's the reason to add it? |
||
| this(new HashMap[String, Map[String, Object]]()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.samza.serializers; | ||
|
|
||
| import java.io.IOException; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import org.apache.samza.metrics.reporter.MetricsSnapshot; | ||
| import org.codehaus.jackson.map.ObjectMapper; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| public class MetricsSnapshotSerdeV2 implements Serde<MetricsSnapshot> { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class); | ||
| private final ObjectMapper objectMapper; | ||
|
|
||
| public MetricsSnapshotSerdeV2() { | ||
| objectMapper = new ObjectMapper(); | ||
| objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE); | ||
| } | ||
|
|
||
| @Override | ||
| public MetricsSnapshot fromBytes(byte[] bytes) { | ||
| try { | ||
| return MetricsSnapshot.fromMap( | ||
| objectMapper.readValue(bytes, new HashMap<String, Map<String, Object>>().getClass())); | ||
| } catch (IOException e) { | ||
| LOG.info("Exception while deserializing", e); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public byte[] toBytes(MetricsSnapshot metricsSnapshot) { | ||
| try { | ||
| return objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8"); | ||
| } catch (IOException e) { | ||
| LOG.info("Exception while serializing", e); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** Metrics returns an UnmodifiableMap. | ||
| * Unmodifiable maps should not be serialized with type, because UnmodifiableMap cannot be deserialized. | ||
| * So we convert to HashMap. | ||
| */ | ||
| private HashMap convertMap(Map<String, Object> map) { | ||
| HashMap retVal = new HashMap(map); | ||
| for (Map.Entry<String, Object> entry : map.entrySet()) { | ||
| if (entry.getValue() instanceof Map) { | ||
| retVal.put(entry.getKey(), convertMap((Map<String, Object>) entry.getValue())); | ||
| } | ||
| } | ||
| return retVal; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.samza.serializers; | ||
|
|
||
| import org.apache.samza.config.Config; | ||
| import org.apache.samza.metrics.reporter.MetricsSnapshot; | ||
|
|
||
|
|
||
| public class MetricsSnapshotSerdeV2Factory implements SerdeFactory<MetricsSnapshot> { | ||
| @Override | ||
| public Serde<MetricsSnapshot> getSerde(String name, Config config) { | ||
| return new MetricsSnapshotSerdeV2(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.samza.serializers.model.serializers; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import org.apache.samza.SamzaException; | ||
| import org.apache.samza.diagnostics.DiagnosticsExceptionEvent; | ||
| import org.apache.samza.metrics.ListGauge; | ||
| import org.apache.samza.metrics.reporter.Metrics; | ||
| import org.apache.samza.metrics.reporter.MetricsHeader; | ||
| import org.apache.samza.metrics.reporter.MetricsSnapshot; | ||
| import org.apache.samza.serializers.MetricsSnapshotSerdeV2; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
|
|
||
| public class TestMetricsSnapshotSerdeV2 { | ||
|
|
||
| @Test | ||
| public void testSerde() { | ||
| MetricsHeader metricsHeader = | ||
| new MetricsHeader("jobName", "i001", "container 0", "source", "300.14.25.1", "1", "1", 1, 1); | ||
|
|
||
| ListGauge listGauge = new ListGauge<DiagnosticsExceptionEvent>("exceptions"); | ||
| DiagnosticsExceptionEvent diagnosticsExceptionEvent1 = | ||
| new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")), | ||
| new HashMap()); | ||
|
|
||
| listGauge.add(diagnosticsExceptionEvent1); | ||
|
|
||
| String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics"; | ||
| Map<String, Map<String, Object>> metricMessage = new HashMap<>(); | ||
| metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>()); | ||
| metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues()); | ||
| metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0); | ||
|
|
||
| MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage)); | ||
|
|
||
| MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2(); | ||
| byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot); | ||
|
|
||
| MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes); | ||
|
|
||
| Assert.assertTrue("Headers map should be equal", | ||
| metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap())); | ||
|
|
||
| Assert.assertTrue("Metrics map should be equal", | ||
| metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap())); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it would make sense to provide a default class name here as well, for log4j 1.2.x (since in open source, we still depend on log4j 1.2.x).