diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 1ffd70a3ff..cd6d02249e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -260,6 +260,53 @@ public TezConfiguration(boolean loadDefaults) { public static final String TEZ_TASK_LOG_LEVEL = TEZ_TASK_PREFIX + "log.level"; public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO"; + /** + * By this option, user can easily override the logging pattern which is applied in + * TezContainerLogAppender in AM, regardless of the environmental settings. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_LOG_PATTERN_LAYOUT_AM = TEZ_AM_PREFIX + "log.pattern.layout"; + + /** + * By this option, user can easily override the logging pattern which is applied in + * TezContainerLogAppender in tasks, regardless of the environmental settings. + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty + public static final String TEZ_LOG_PATTERN_LAYOUT_TASK = TEZ_TASK_PREFIX + "log.pattern.layout"; + + /** + * Set pattern to empty string to turn the custom log pattern feature off. + */ + public static final String TEZ_LOG_PATTERN_LAYOUT_DEFAULT = ""; + + /** + * Comma separated list of keys, which can used for defining keys in MDC. The corresponding values + * will be read from Configuration, see tez.mdc.custom.keys.conf.props for further details. + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_MDC_CUSTOM_KEYS = TEZ_PREFIX + "mdc.custom.keys"; + public static final String TEZ_MDC_CUSTOM_KEYS_DEFAULT = ""; + + /** + * Comma separated list of Configuration keys. Tez will try to fill MDC with key value pairs in a + * way that a key will be the nth item in tez.mdc.custom.keys and the value will be the value from + * a Configuration object pointed by the nth key of tez.mdc.custom.keys.conf.props like below: + * + * tez.mdc.custom.keys=queryId,otherKey + * tez.mdc.custom.keys.conf.props=awesome.sql.app.query.id,awesome.sql.app.other.key + * + * So MDC will contain key -{@literal >} value pairs as: + * queryId -{@literal >} conf.get("awesome.sql.app.query.id") + * otherKey -{@literal >} conf.get("awesome.sql.app.other.key") + */ + @ConfigurationScope(Scope.AM) + @ConfigurationProperty + public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS = TEZ_MDC_CUSTOM_KEYS + ".conf.props"; + public static final String TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT = ""; + /** * double value. Represents ratio of unique failed outputs / number of consumer * tasks. When this condition or value mentioned in {@link diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index c2efb29cb6..7ee5bb457e 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -47,6 +47,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.log4j.Appender; +import org.apache.log4j.PatternLayout; import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.records.TezDAGID; @@ -157,17 +158,25 @@ private static String sanitizeString(String srcString) { return res; // Number starts allowed rightnow } - public static void updateLoggers(String addend) throws FileNotFoundException { + public static void updateLoggers(Configuration configuration, String addend, String patternString) + throws FileNotFoundException { LOG.info("Redirecting log file based on addend: " + addend); - Appender appender = org.apache.log4j.Logger.getRootLogger().getAppender( - TezConstants.TEZ_CONTAINER_LOGGER_NAME); + Appender appender = + org.apache.log4j.Logger.getRootLogger().getAppender(TezConstants.TEZ_CONTAINER_LOGGER_NAME); if (appender != null) { if (appender instanceof TezContainerLogAppender) { TezContainerLogAppender claAppender = (TezContainerLogAppender) appender; - claAppender.setLogFileName(constructLogFileName( - TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend)); + claAppender + .setLogFileName(constructLogFileName(TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend)); + + // there was a configured pattern + if (patternString != null) { + PatternLayout layout = (PatternLayout) claAppender.getLayout(); + layout.setConversionPattern(patternString); + } + claAppender.activateOptions(); } else { LOG.warn("Appender is a " + appender.getClass() + "; require an instance of " diff --git a/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java b/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java new file mode 100644 index 0000000000..e09b6b0964 --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/util/LoggingUtils.java @@ -0,0 +1,151 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.util; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.Hashtable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.helpers.ThreadLocalMap; +import org.apache.tez.dag.api.TezConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class LoggingUtils { + private static final Logger LOG = LoggerFactory.getLogger(LoggingUtils.class); + + private LoggingUtils() {} + + @SuppressWarnings("unchecked") + public static void initLoggingContext(ThreadLocalMap threadLocalMap, Configuration conf, + String dagId, String taskAttemptId) { + Hashtable data = (Hashtable) threadLocalMap.get(); + if (data == null) { + data = new NonClonableHashtable(); + threadLocalMap.set(data); + } + data.put("dagId", dagId == null ? "" : dagId); + data.put("taskAttemptId", taskAttemptId == null ? "" : taskAttemptId); + + String[] mdcKeys = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS, + TezConfiguration.TEZ_MDC_CUSTOM_KEYS_DEFAULT); + + if (mdcKeys == null || mdcKeys.length == 0) { + return; + } + + String[] mdcKeysValuesFrom = conf.getStrings(TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS, + TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS_DEFAULT); + LOG.info("MDC_LOGGING: setting up MDC keys: keys: {} / conf: {}", Arrays.asList(mdcKeys), + Arrays.asList(mdcKeysValuesFrom)); + + int i = 0; + for (String mdcKey : mdcKeys) { + // don't want to fail on incorrect mdc key settings, but warn in app logs + if (mdcKey.isEmpty() || mdcKeysValuesFrom.length < i + 1) { + LOG.warn("cannot set mdc key: {}", mdcKey); + break; + } + + String mdcValue = mdcKeysValuesFrom[i] == null ? "" : conf.get(mdcKeysValuesFrom[i]); + // MDC is backed by a Hashtable, let's prevent NPE because of null values + if (mdcValue != null) { + data.put(mdcKey, mdcValue); + } else { + LOG.warn("MDC_LOGGING: mdc value is null for key: {}, config key: {}", mdcKey, + mdcKeysValuesFrom[i]); + } + + i++; + } + } + + public static String getPatternForAM(Configuration conf) { + String pattern = + conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_AM, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT); + return pattern.isEmpty() ? null : pattern; + } + + public static String getPatternForTask(Configuration conf) { + String pattern = + conf.get(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_TASK, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT); + return pattern.isEmpty() ? null : pattern; + } + + /** + * This method is for setting a NonClonableHashtable into log4j's mdc. Reflection hacks are + * needed, because MDC.mdc is well protected (final static MDC mdc = new MDC();). The logic below + * is supposed to be called once per JVM, so it's not a subject to performance bottlenecks. For + * further details of this solution, please check NonClonableHashtable class, which is set into + * the ThreadLocalMap. A wrong outcome of this method (any kind of runtime/reflection problems) + * should not affect the DAGAppMaster/TezChild. In case of an exception a ThreadLocalMap is + * returned, but it won't affect the content of the MDC. + */ + @SuppressWarnings("unchecked") + public static ThreadLocalMap setupLog4j() { + ThreadLocalMap mdcContext = new ThreadLocalMap(); + mdcContext.set(new NonClonableHashtable()); + + try { + final Constructor[] constructors = org.apache.log4j.MDC.class.getDeclaredConstructors(); + for (Constructor c : constructors) { + c.setAccessible(true); + } + + org.apache.log4j.MDC mdc = (org.apache.log4j.MDC) constructors[0].newInstance(); + Field tlmField = org.apache.log4j.MDC.class.getDeclaredField("tlm"); + tlmField.setAccessible(true); + tlmField.set(mdc, mdcContext); + + Field mdcField = org.apache.log4j.MDC.class.getDeclaredField("mdc"); + mdcField.setAccessible(true); + + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(mdcField, mdcField.getModifiers() & ~Modifier.FINAL); + + mdcField.set(null, mdc); + + } catch (Exception e) { + LOG.warn("Cannot set log4j global MDC, mdcContext won't be applied to log4j's MDC class", e); + } + + return mdcContext; + } + + /** + * NonClonableHashtable is a special class for hacking the log4j MDC context. By design, log4j's + * MDC uses a ThreadLocalMap, which clones parent thread's context before propagating it to child + * thread (see: @see {@link org.apache.log4j.helpers.ThreadLocalMap#childValue()}). In our + * usecase, this is not suitable, as we want to maintain only one context globally (and set e.g. + * dagId, taskAttemptId), then update it as easy as possible when dag/taskattempt changes, without + * having to propagate the update parameters to all the threads in the JVM. + */ + private static class NonClonableHashtable extends Hashtable { + private static final long serialVersionUID = 1L; + + @Override + public synchronized Object clone() { + return this; + } + } +} diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index c8519201fd..a5d7b7db9f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -112,6 +112,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.log4j.helpers.ThreadLocalMap; import org.apache.tez.common.AsyncDispatcher; import org.apache.tez.common.AsyncDispatcherConcurrent; import org.apache.tez.common.GcTimeUpdater; @@ -184,6 +185,7 @@ import org.apache.tez.dag.utils.Simple2LevelVersionComparator; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.hadoop.shim.HadoopShimsLoader; +import org.apache.tez.util.LoggingUtils; import org.apache.tez.util.TezMxBeanResourceCalculator; import org.codehaus.jettison.json.JSONException; import org.slf4j.Logger; @@ -336,6 +338,7 @@ public class DAGAppMaster extends AbstractService { // must be LinkedHashMap to preserve order of service addition Map services = new LinkedHashMap(); + private ThreadLocalMap mdcContext; public DAGAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, @@ -343,6 +346,7 @@ public DAGAppMaster(ApplicationAttemptId applicationAttemptId, String [] localDirs, String[] logDirs, String clientVersion, Credentials credentials, String jobUserName, AMPluginDescriptorProto pluginDescriptorProto) { super(DAGAppMaster.class.getName()); + this.mdcContext = LoggingUtils.setupLog4j(); this.clock = clock; this.startTime = clock.getTime(); this.appSubmitTime = appSubmitTime; @@ -690,7 +694,7 @@ protected TaskSchedulerManager getTaskSchedulerManager() { private void handleInternalError(String errDiagnosticsPrefix, String errDiagDagEvent) { state = DAGAppMasterState.ERROR; if (currentDAG != null) { - _updateLoggers(currentDAG, "_post"); + updateLoggers(currentDAG, "_post"); LOG.info(errDiagnosticsPrefix + ". Aborting dag: " + currentDAG.getID()); // Inform the current DAG about the error sendEvent(new DAGEventInternalError(currentDAG.getID(), errDiagDagEvent)); @@ -760,7 +764,7 @@ protected synchronized void handle(DAGAppMasterEvent event) { if (!isSession) { LOG.info("Not a session, AM will unregister as DAG has completed"); this.taskSchedulerManager.setShouldUnregisterFlag(); - _updateLoggers(currentDAG, "_post"); + updateLoggers(currentDAG, "_post"); setStateOnDAGCompletion(); LOG.info("Shutting down on completion of dag:" + finishEvt.getDAGId()); shutdownHandler.shutdown(); @@ -768,7 +772,7 @@ protected synchronized void handle(DAGAppMasterEvent event) { LOG.info("DAG completed, dagId=" + finishEvt.getDAGId() + ", dagState=" + finishEvt.getDAGState()); lastDAGCompletionTime = clock.getTime(); - _updateLoggers(currentDAG, "_post"); + updateLoggers(currentDAG, "_post"); if (this.historyEventHandler.hasRecoveryFailed()) { String recoveryErrorMsg = "Recovery had a fatal error, shutting down session after" + " DAG completion"; @@ -879,9 +883,10 @@ protected synchronized void handle(DAGAppMasterEvent event) { } } - private void _updateLoggers(DAG dag, String appender) { + private void updateLoggers(DAG dag, String appender) { try { - TezUtilsInternal.updateLoggers(dag.getID().toString() + appender); + TezUtilsInternal.updateLoggers(dag.getConf(), dag.getID().toString() + appender, + LoggingUtils.getPatternForAM(dag.getConf())); } catch (FileNotFoundException e) { LOG.warn("Unable to update the logger. Continue with the old logger", e ); } @@ -2007,7 +2012,7 @@ public void serviceStart() throws Exception { + ", state=" + (recoveredDAGData.dagState == null ? "null" : recoveredDAGData.dagState) + ", failureReason=" + recoveredDAGData.reason); - _updateLoggers(recoveredDAGData.recoveredDAG, ""); + updateLoggers(recoveredDAGData.recoveredDAG, ""); if (recoveredDAGData.nonRecoverable) { addDiagnostic("DAG " + recoveredDAGData.recoveredDagID + " can not be recovered due to " + recoveredDAGData.reason); @@ -2042,7 +2047,7 @@ public void serviceStart() throws Exception { } } else { LOG.info("Found DAG to recover, dagId=" + recoveredDAGData.recoveredDAG.getID()); - _updateLoggers(recoveredDAGData.recoveredDAG, ""); + updateLoggers(recoveredDAGData.recoveredDAG, ""); DAGRecoveredEvent dagRecoveredEvent = new DAGRecoveredEvent(this.appAttemptID, recoveredDAGData.recoveredDAG.getID(), recoveredDAGData.recoveredDAG.getName(), recoveredDAGData.recoveredDAG.getUserName(), this.clock.getTime(), this.containerLogs); @@ -2467,7 +2472,9 @@ private void startDAG(DAGPlan dagPlan, Map additionalAMRe // /////////////////// Create the job itself. final DAG newDAG = createDAG(dagPlan); - _updateLoggers(newDAG, ""); + LoggingUtils.initLoggingContext(mdcContext, newDAG.getConf(), newDAG.getID().toString(), null); + + updateLoggers(newDAG, ""); if (LOG.isDebugEnabled()) { LOG.debug("Running a DAG with " + dagPlan.getVertexCount() + " vertices "); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java index b89b12db2b..7ab532ad33 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.log4j.helpers.ThreadLocalMap; import org.apache.tez.common.ContainerContext; import org.apache.tez.common.ContainerTask; import org.apache.tez.common.TezCommonUtils; @@ -63,6 +64,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.RelocalizationUtils; import org.apache.tez.hadoop.shim.HadoopShim; @@ -71,6 +73,8 @@ import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.util.LoggingUtils; + import org.apache.tez.util.TezRuntimeShutdownHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,6 +129,7 @@ public class TezChild { private TezVertexID lastVertexID; private final HadoopShim hadoopShim; private final TezExecutors sharedExecutor; + private ThreadLocalMap mdcContext; public TezChild(Configuration conf, String host, int port, String containerIdentifier, String tokenIdentifier, int appAttemptNumber, String workingDir, String[] localDirs, @@ -133,6 +138,7 @@ public TezChild(Configuration conf, String host, int port, String containerIdent ExecutionContext executionContext, Credentials credentials, long memAvailable, String user, TezTaskUmbilicalProtocol umbilical, boolean updateSysCounters, HadoopShim hadoopShim) throws IOException, InterruptedException { + this.mdcContext = LoggingUtils.setupLog4j(); this.defaultConf = conf; this.containerIdString = containerIdentifier; this.appAttemptNumber = appAttemptNumber; @@ -216,7 +222,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, while (!executor.isTerminated() && !isShutdown.get()) { if (taskCount > 0) { - TezUtilsInternal.updateLoggers(""); + TezUtilsInternal.updateLoggers(defaultConf, "", LoggingUtils.getPatternForTask(defaultConf)); } ListenableFuture getTaskFuture = executor.submit(containerReporter); boolean error = false; @@ -240,6 +246,19 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, shutdown(); } } + + TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID(); + if (containerTask.getTaskSpec().getTaskConf() != null) { + Configuration copy = new Configuration(defaultConf); + TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy); + + LoggingUtils.initLoggingContext(mdcContext, copy, + attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); + } else { + LoggingUtils.initLoggingContext(mdcContext, defaultConf, + attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString()); + } + TezCommonUtils.logCredentials(LOG, containerTask.getCredentials(), "containerTask"); if (containerTask.shouldDie()) { LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString); @@ -256,7 +275,8 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, containerTask.getTaskSpec().getTaskAttemptID().toString()); TezUtilsInternal.setHadoopCallerContext(hadoopShim, containerTask.getTaskSpec().getTaskAttemptID()); - TezUtilsInternal.updateLoggers(loggerAddend); + TezUtilsInternal.updateLoggers(defaultConf, loggerAddend, LoggingUtils.getPatternForTask(defaultConf)); + FileSystem.clearStatistics(); childUGI = handleNewTaskCredentials(containerTask, childUGI); @@ -270,6 +290,7 @@ public ContainerExecutionResult run() throws IOException, InterruptedException, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, sharedExecutor); + boolean shouldDie; try { TaskRunner2Result result = taskRunner.run(); diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java index bbf037b8bf..ce379b5d0a 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; + import org.apache.tez.common.Preconditions; import com.google.common.collect.Multimap; import org.apache.commons.lang.exception.ExceptionUtils; @@ -140,18 +141,22 @@ public TezTaskRunner2(Configuration tezConf, UserGroupInformation ugi, String[] this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler(); this.hadoopShim = hadoopShim; this.taskConf = new Configuration(tezConf); + mergeTaskSpecConfToConf(taskSpec, taskConf); + localExecutor = sharedExecutor == null ? new TezSharedExecutor(tezConf) : null; + this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, + umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, + objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, + sharedExecutor == null ? localExecutor : sharedExecutor); + } + + static void mergeTaskSpecConfToConf(TaskSpec taskSpec, Configuration conf) { if (taskSpec.getTaskConf() != null) { Iterator> iter = taskSpec.getTaskConf().iterator(); while (iter.hasNext()) { Entry entry = iter.next(); - taskConf.set(entry.getKey(), entry.getValue()); + conf.set(entry.getKey(), entry.getValue()); } } - localExecutor = sharedExecutor == null ? new TezSharedExecutor(tezConf) : null; - this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, - umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, - objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim, - sharedExecutor == null ? localExecutor : sharedExecutor); } /** diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java index 6876df93ec..a6d05beb5f 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java @@ -25,9 +25,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.common.TezExecutors; import org.apache.tez.common.TezSharedExecutor; import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -50,8 +55,13 @@ public void testTaskConfUsage() throws Exception { List inputSpecList = new ArrayList<>(); List outputSpecList = new ArrayList<>(); - TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class), - inputSpecList, outputSpecList, null, taskConf); + TaskSpec taskSpec = + new TaskSpec( + TezTaskAttemptID.getInstance( + TezTaskID.getInstance(TezVertexID + .getInstance(TezDAGID.getInstance(ApplicationId.fromString("application_1_1"), 0), 0), 0), 0), + "dagName", "vertexName", 1, mock(ProcessorDescriptor.class), inputSpecList, + outputSpecList, null, taskConf); TezExecutors sharedExecutor = new TezSharedExecutor(conf); TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class), localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid", diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 6de272358b..fd8f08b42c 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -164,10 +164,59 @@ public static void tearDown() { public void testHashJoinExample() throws Exception { HashJoinExample hashJoinExample = new HashJoinExample(); hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig())); - Path stagingDirPath = new Path("/tmp/tez-staging-dir"); - Path inPath1 = new Path("/tmp/hashJoin/inPath1"); - Path inPath2 = new Path("/tmp/hashJoin/inPath2"); - Path outPath = new Path("/tmp/hashJoin/outPath"); + runHashJoinExample(hashJoinExample); + } + + @Test(timeout = 60000) + public void testHashJoinExampleWithLogPattern() throws Exception { + HashJoinExample hashJoinExample = new HashJoinExample(); + + Configuration patternConfig = new Configuration(mrrTezCluster.getConfig()); + + patternConfig.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "debug"); + patternConfig.set(TezConfiguration.TEZ_TASK_LOG_LEVEL, "debug"); + patternConfig.set(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_AM, + "%d{ISO8601} [%p] [%t (queryId=%X{queryId} dag=%X{dagId})] |%c{2}|: %m%n"); + patternConfig.set(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_TASK, + "%d{ISO8601} [%p] [%t (queryId=%X{queryId} dag=%X{dagId} task=%X{taskAttemptId})] |%c{2}|: %m%n"); + patternConfig.set(TezConfiguration.TEZ_MDC_CUSTOM_KEYS, "queryId"); + patternConfig.set(TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS, "hive.query.id"); + patternConfig.set("hive.query.id", "hello-upstream-application-12345"); + + //1. feature is on + //[main (queryId=hello-upstream-application-12345 dag=dag_1666683231618_0001_1)] |app.DAGAppMaster| + hashJoinExample.setConf(patternConfig); + runHashJoinExample(hashJoinExample); + + //2. feature is on, but custom keys are empty: expecting empty queryId with the same format + //[main (queryId= dag=dag_1666683231618_0002_1)] |app.DAGAppMaster| + patternConfig.set(TezConfiguration.TEZ_MDC_CUSTOM_KEYS, ""); + hashJoinExample.setConf(patternConfig); + runHashJoinExample(hashJoinExample); + + //3. feature is on, custom keys are defined but corresponding value is null in config: + //expecting empty queryId with the same format + //[main (queryId= dag=dag_1666683231618_0003_1)] |app.DAGAppMaster| + patternConfig.set(TezConfiguration.TEZ_MDC_CUSTOM_KEYS, "queryId"); + patternConfig.set(TezConfiguration.TEZ_MDC_CUSTOM_KEYS_CONF_PROPS, "hive.query.id.null"); + hashJoinExample.setConf(patternConfig); + runHashJoinExample(hashJoinExample); + + //4. feature is off: expecting to have properly formatted log lines with original log4j config (not empty string) + //[main] |app.DAGAppMaster| + patternConfig.set(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_AM, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT); + patternConfig.set(TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_TASK, TezConfiguration.TEZ_LOG_PATTERN_LAYOUT_DEFAULT); + + hashJoinExample.setConf(patternConfig); + runHashJoinExample(hashJoinExample); + } + + private void runHashJoinExample(HashJoinExample hashJoinExample) throws Exception { + int random = new Random(System.currentTimeMillis()).nextInt(10000); + Path stagingDirPath = new Path(String.format("/tmp/tez-staging-dir%d", random)); + Path inPath1 = new Path(String.format("/tmp/hashJoin%d/inPath1", random)); + Path inPath2 = new Path(String.format("/tmp/hashJoin%d/inPath2", random)); + Path outPath = new Path(String.format("/tmp/hashJoin%d/outPath", random)); remoteFs.mkdirs(inPath1); remoteFs.mkdirs(inPath2); remoteFs.mkdirs(stagingDirPath);