Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,53 @@ public TezConfiguration(boolean loadDefaults) {
public static final String TEZ_TASK_LOG_LEVEL = TEZ_TASK_PREFIX + "log.level";
public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";

/**
* By this option, user can easily override the logging pattern which is applied in
* TezContainerLogAppender in AM, regardless of the environmental settings.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_LOG_PATTERN_LAYOUT_AM = TEZ_AM_PREFIX + "log.pattern.layout";

/**
* By this option, user can easily override the logging pattern which is applied in
* TezContainerLogAppender in tasks, regardless of the environmental settings.
*/
@ConfigurationScope(Scope.VERTEX)
@ConfigurationProperty
public static final String TEZ_LOG_PATTERN_LAYOUT_TASK = TEZ_TASK_PREFIX + "log.pattern.layout";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any default value for this? (e.g If the user wants to reset it back to old value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, user should be able to turn this off by setting it to an empty string, I'm reflecting this in the configuration


/**
* Set pattern to empty string to turn the custom log pattern feature off.
*/
public static final String TEZ_LOG_PATTERN_LAYOUT_DEFAULT = "";

/**
* Comma separated list of keys, which can used for defining keys in MDC. The corresponding values
* will be read from Configuration, see tez.mdc.custom.keys.conf.props for further details.
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_MDC_CUSTOM_KEYS = TEZ_PREFIX + "mdc.custom.keys";
public static final String TEZ_MDC_CUSTOM_KEYS_DEFAULT = "";

/**
* Comma separated list of Configuration keys. Tez will try to fill MDC with key value pairs in a
* way that a key will be the nth item in tez.mdc.custom.keys and the value will be the value from
* a Configuration object pointed by the nth key of tez.mdc.custom.keys.conf.props like below:
*
* tez.mdc.custom.keys=queryId,otherKey
* tez.mdc.custom.keys.conf.props=awesome.sql.app.query.id,awesome.sql.app.other.key
*
* So MDC will contain key -{@literal >} value pairs as:
* queryId -{@literal >} conf.get("awesome.sql.app.query.id")
* otherKey -{@literal >} conf.get("awesome.sql.app.other.key")
*/
@ConfigurationScope(Scope.AM)
@ConfigurationProperty
public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS = TEZ_MDC_CUSTOM_KEYS + ".conf.props";
public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT = "";

/**
* double value. Represents ratio of unique failed outputs / number of consumer
* tasks. When this condition or value mentioned in {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.log4j.Appender;
import org.apache.log4j.PatternLayout;
import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.records.TezDAGID;
Expand Down Expand Up @@ -157,17 +158,25 @@ private static String sanitizeString(String srcString) {
return res; // Number starts allowed rightnow
}

public static void updateLoggers(String addend) throws FileNotFoundException {
public static void updateLoggers(Configuration configuration, String addend, String patternString)
throws FileNotFoundException {

LOG.info("Redirecting log file based on addend: " + addend);

Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender(
TezConstants.TEZ_CONTAINER_LOGGER_NAME);
Appender appender =
org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME);
if (appender != null) {
if (appender instanceof TezContainerLogAppender) {
TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
claAppender.setLogFileName(constructLogFileName(
TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend));
claAppender
.setLogFileName(constructLogFileName(TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend));

// there was a configured pattern
if (patternString != null) {
PatternLayout layout = (PatternLayout) claAppender.getLayout();
layout.setConversionPattern(patternString);
}

claAppender.activateOptions();
} else {
LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
Expand Down
151 changes: 151 additions & 0 deletions tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.util;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.dag.api.TezConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LoggingUtils {
private static final Logger LOG = LoggerFactory.getLogger(LoggingUtils.class);

private LoggingUtils() {}

@SuppressWarnings("unchecked")
public static void initLoggingContext(ThreadLocalMap threadLocalMap, Configuration conf,
String dagId, String taskAttemptId) {
Hashtable<String, String> data = (Hashtable<String, String>) threadLocalMap.get();
if (data == null) {
data = new NonClonableHashtable<String, String>();
threadLocalMap.set(data);
}
data.put("dagId", dagId == null ? "" : dagId);
data.put("taskAttemptId", taskAttemptId == null ? "" : taskAttemptId);

String[] mdcKeys = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS,
TezConfiguration.TEZ_MDC_CUSTOM_KEYS_DEFAULT);

if (mdcKeys == null || mdcKeys.length == 0) {
return;
}

String[] mdcKeysValuesFrom = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS,
TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT);
LOG.info("MDC_LOGGING: setting up MDC keys: keys: {} / conf: {}", Arrays.asList(mdcKeys),
Arrays.asList(mdcKeysValuesFrom));

int i = 0;
for (String mdcKey : mdcKeys) {
// don't want to fail on incorrect mdc key settings, but warn in app logs
if (mdcKey.isEmpty() || mdcKeysValuesFrom.length < i + 1) {
LOG.warn("cannot set mdc key: {}", mdcKey);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean break or continue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we hit this it means that mdcKey is most probably empty, so we had:

tez.mdc.custom.keys=

as it's very unlikely that the user set something like this:

tez.mdc.custom.keys=,something_which_matters

if we split string by commas and find empty string, it makes no sense to loop further, or at least I cannot figure out a valid use-case

}

String mdcValue = mdcKeysValuesFrom[i] == null ? "" : conf.get(mdcKeysValuesFrom[i]);
// MDC is backed by a Hashtable, let's prevent NPE because of null values
if (mdcValue != null) {
data.put(mdcKey, mdcValue);
} else {
LOG.warn("MDC_LOGGING: mdc value is null for key: {}, config key: {}", mdcKey,
mdcKeysValuesFrom[i]);
}

i++;
}
}

public static String getPatternForAM(Configuration conf) {
String pattern =
conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_AM, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT);
return pattern.isEmpty() ? null : pattern;
}

public static String getPatternForTask(Configuration conf) {
String pattern =
conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_TASK, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT);
return pattern.isEmpty() ? null : pattern;
}

/**
* This method is for setting a NonClonableHashtable into log4j's mdc. Reflection hacks are
* needed, because MDC.mdc is well protected (final static MDC mdc = new MDC();). The logic below
* is supposed to be called once per JVM, so it's not a subject to performance bottlenecks. For
* further details of this solution, please check NonClonableHashtable class, which is set into
* the ThreadLocalMap. A wrong outcome of this method (any kind of runtime/reflection problems)
* should not affect the DAGAppMaster/TezChild. In case of an exception a ThreadLocalMap is
* returned, but it won't affect the content of the MDC.
*/
@SuppressWarnings("unchecked")
public static ThreadLocalMap setupLog4j() {
ThreadLocalMap mdcContext = new ThreadLocalMap();
mdcContext.set(new NonClonableHashtable<String, String>());

try {
final Constructor<?>[] constructors = org.apache.log4j.MDC.class.getDeclaredConstructors();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log4j or slf4j?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log4j, intentionally, the whole hack implemented in this method is log4j specific

for (Constructor<?> c : constructors) {
c.setAccessible(true);
}

org.apache.log4j.MDC mdc = (org.apache.log4j.MDC) constructors[0].newInstance();
Field tlmField = org.apache.log4j.MDC.class.getDeclaredField("tlm");
tlmField.setAccessible(true);
tlmField.set(mdc, mdcContext);

Field mdcField = org.apache.log4j.MDC.class.getDeclaredField("mdc");
mdcField.setAccessible(true);

Field modifiers = Field.class.getDeclaredField("modifiers");
modifiers.setAccessible(true);
modifiers.setInt(mdcField, mdcField.getModifiers() & ~Modifier.FINAL);

mdcField.set(null, mdc);

} catch (Exception e) {
LOG.warn("Cannot set log4j global MDC, mdcContext won't be applied to log4j's MDC class", e);
}

return mdcContext;
}

/**
* NonClonableHashtable is a special class for hacking the log4j MDC context. By design, log4j's
* MDC uses a ThreadLocalMap, which clones parent thread's context before propagating it to child
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this configurable via isThreadContextMapInheritable system property?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, isThreadContextMapInheritable is slightly different from what I needed:

Set the system property `log4j2.isThreadContextMapInheritable` to `true` to enable child threads to inherit the Thread Context Map.

this is used for inheritance, which is needed, child thread inherits parent's context, this is fine, the problem is that it inherits and clones the map, which is against my implementation...
my implementation makes it possible to define a global MDC context in DAGAppMaster and TezChild in the main thread, and all threads inherit that, but when a new dag (DAGAppMaster) or task attempt (TezChild) comes, modifying the global thread's context should have an effect on all threads' context, but with cloning, it won't have:
https://github.com/apache/log4j/blob/trunk/src/main/java/org/apache/log4j/helpers/ThreadLocalMap.java#L34-L41

instead, I choose to define 1 context in the main thread and propagating it to all child threads (which is automatic due to ThreadLocalMap behavior by default), but I only need to init the logging context once for every dag/taskattempt. Without cloning, a single change will change the MDC contents of all threads in the JVM.

* thread (see: @see {@link org.apache.log4j.helpers.ThreadLocalMap#childValue()}). In our
* usecase, this is not suitable, as we want to maintain only one context globally (and set e.g.
* dagId, taskAttemptId), then update it as easy as possible when dag/taskattempt changes, without
* having to propagate the update parameters to all the threads in the JVM.
*/
private static class NonClonableHashtable<K, V> extends Hashtable<String, String> {
private static final long serialVersionUID = 1L;

@Override
public synchronized Object clone() {
return this;
}
}
}
23 changes: 15 additions & 8 deletions tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.helpers.ThreadLocalMap;
import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.AsyncDispatcherConcurrent;
import org.apache.tez.common.GcTimeUpdater;
Expand Down Expand Up @@ -184,6 +185,7 @@
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.util.LoggingUtils;
import org.apache.tez.util.TezMxBeanResourceCalculator;
import org.codehaus.jettison.json.JSONException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -336,13 +338,15 @@ public class DAGAppMaster extends AbstractService {
// must be LinkedHashMap to preserve order of service addition
Map<Service, ServiceWithDependency> services =
new LinkedHashMap<Service, ServiceWithDependency>();
private ThreadLocalMap mdcContext;

public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
Clock clock, long appSubmitTime, boolean isSession, String workingDirectory,
String [] localDirs, String[] logDirs, String clientVersion,
Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) {
super(DAGAppMaster.class.getName());
this.mdcContext = LoggingUtils.setupLog4j();
this.clock = clock;
this.startTime = clock.getTime();
this.appSubmitTime = appSubmitTime;
Expand Down Expand Up @@ -690,7 +694,7 @@ protected TaskSchedulerManager getTaskSchedulerManager() {
private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagEvent) {
state = DAGAppMasterState.ERROR;
if (currentDAG != null) {
_updateLoggers(currentDAG, "_post");
updateLoggers(currentDAG, "_post");
LOG.info(errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID());
// Inform the current DAG about the error
sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent));
Expand Down Expand Up @@ -760,15 +764,15 @@ protected synchronized void handle(DAGAppMasterEvent event) {
if (!isSession) {
LOG.info("Not a session, AM will unregister as DAG has completed");
this.taskSchedulerManager.setShouldUnregisterFlag();
_updateLoggers(currentDAG, "_post");
updateLoggers(currentDAG, "_post");
setStateOnDAGCompletion();
LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId());
shutdownHandler.shutdown();
} else {
LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState="
+ finishEvt.getDAGState());
lastDAGCompletionTime = clock.getTime();
_updateLoggers(currentDAG, "_post");
updateLoggers(currentDAG, "_post");
if (this.historyEventHandler.hasRecoveryFailed()) {
String recoveryErrorMsg = "Recovery had a fatal error, shutting down session after" +
" DAG completion";
Expand Down Expand Up @@ -879,9 +883,10 @@ protected synchronized void handle(DAGAppMasterEvent event) {
}
}

private void _updateLoggers(DAG dag, String appender) {
private void updateLoggers(DAG dag, String appender) {
try {
TezUtilsInternal.updateLoggers(dag.getID().toString() + appender);
TezUtilsInternal.updateLoggers(dag.getConf(), dag.getID().toString() + appender,
LoggingUtils.getPatternForAM(dag.getConf()));
} catch (FileNotFoundException e) {
LOG.warn("Unable to update the logger. Continue with the old logger", e );
}
Expand Down Expand Up @@ -2007,7 +2012,7 @@ public void serviceStart() throws Exception {
+ ", state=" + (recoveredDAGData.dagState == null ? "null" :
recoveredDAGData.dagState)
+ ", failureReason=" + recoveredDAGData.reason);
_updateLoggers(recoveredDAGData.recoveredDAG, "");
updateLoggers(recoveredDAGData.recoveredDAG, "");
if (recoveredDAGData.nonRecoverable) {
addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not be recovered due to "
+ recoveredDAGData.reason);
Expand Down Expand Up @@ -2042,7 +2047,7 @@ public void serviceStart() throws Exception {
}
} else {
LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID());
_updateLoggers(recoveredDAGData.recoveredDAG, "");
updateLoggers(recoveredDAGData.recoveredDAG, "");
DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID,
recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(),
recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), this.containerLogs);
Expand Down Expand Up @@ -2467,7 +2472,9 @@ private void startDAG(DAGPlan dagPlan, Map<String, LocalResource> additionalAMRe

// /////////////////// Create the job itself.
final DAG newDAG = createDAG(dagPlan);
_updateLoggers(newDAG, "");
LoggingUtils.initLoggingContext(mdcContext, newDAG.getConf(), newDAG.getID().toString(), null);

updateLoggers(newDAG, "");
if (LOG.isDebugEnabled()) {
LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
+ " vertices ");
Expand Down
Loading