diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml deleted file mode 100644 index 50422ff0e0..0000000000 --- a/tez-dag/findbugs-exclude.xml +++ /dev/null @@ -1,267 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsyncProvider.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsyncProvider.java new file mode 100644 index 0000000000..689230c5aa --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsyncProvider.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.rm; + +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import java.util.List; + + +public class TezAMRMClientAsyncProvider /*implements AMRMClientAsync.CallbackHandler*/ { + + private static TezAMRMClientAsync INSTANCE; + // private static AMRMClient INSTANCE; + + public static synchronized TezAMRMClientAsync createAMRMClientAsync( + int intervalMs, AMRMClientAsync.CallbackHandler callbackHandler) { + if (INSTANCE == null) { + INSTANCE = + TezAMRMClientAsync.createAMRMClientAsync(1000, callbackHandler); + } + return INSTANCE; + } + + public static synchronized TezAMRMClientAsync getAMRMClientAsync() { + return INSTANCE; + } + + + /* public static synchronized AMRMClient createAMRMClientAsync(){ + + if(INSTANCE == null){ + + INSTANCE= AMRMClient.createAMRMClient(); + + } + + return INSTANCE; + }*/ + + + + /* @Override + public void onContainersCompleted(List statuses) { + } + + @Override + public void onContainersAllocated(List containers) { + } + + @Override + public void onShutdownRequest() { + // LOG.warn("Shutting down"); + // end.set(true); + System.out.println("onshutdownrequest"); + } + + @Override + public void onNodesUpdated(List updatedNodes) { + } + + @Override + public float getProgress() { + return 0; + } + @Override + public void onError(Throwable e) { + // LOG.error("Unexpected error", e); + // end.set(true); + System.out.println("error"); + }*/ + +} diff --git a/tez-plugins/tez-yarn-timeline-history-with-atsv2/pom.xml b/tez-plugins/tez-yarn-timeline-history-with-atsv2/pom.xml new file mode 100644 index 0000000000..5f8217632b --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-atsv2/pom.xml @@ -0,0 +1,97 @@ + + + + 4.0.0 + + org.apache.tez + tez-plugins + 0.9.2 + + tez-yarn-timeline-history-with-atsv2 + + + + org.apache.tez + tez-api + + + org.apache.tez + tez-common + + + org.apache.tez + tez-dag + 0.9.2 + + + org.apache.tez + tez-tests + test-jar + test + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-yarn-api + 2.10.1 + + + org.apache.hadoop + hadoop-yarn-common + 2.10.1 + + + org.apache.hadoop + hadoop-yarn-client + 2.10.1 + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + org.slf4j + slf4j-api + + + org.mockito + mockito-all + test + + + junit + junit + test + + + + + + + org.apache.rat + apache-rat-plugin + + + + + + diff --git a/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV2HistoryLoggingService.java b/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV2HistoryLoggingService.java new file mode 100644 index 0000000000..c01a6bd425 --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/ATSV2HistoryLoggingService.java @@ -0,0 +1,355 @@ +package org.apache.tez.dag.history.logging.ats; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineV2Client; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.app.rm.TezAMRMClientAsync; +import org.apache.tez.dag.app.rm.TezAMRMClientAsyncProvider; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.logging.HistoryLoggingService; +import org.apache.tez.dag.records.TezDAGID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ATSV2HistoryLoggingService extends HistoryLoggingService { + + private static final Logger LOG = + LoggerFactory.getLogger(ATSV2HistoryLoggingService.class); + public static final String TEZ_PREFIX = "tez_"; + + TimelineV2Client timelineV2Client; + private HashSet skippedDAGs = new HashSet(); + + BlockingQueue eventQueue = + new LinkedBlockingQueue(); + protected Thread eventHandlingThread; + private AtomicBoolean stopped = new AtomicBoolean(false); + private AtomicBoolean registered = new AtomicBoolean(false); + private final Object lock = new Object(); + private int eventCounter = 0; + private int eventsProcessed = 0; + private boolean timelineServiceEnabled = true; + private long maxTimeToWaitOnShutdown; + private boolean waitForeverOnShutdown = false; + + private long maxPollingTimeMillis; + + // Number of bytes of config which can be published in one shot to ATSv2. + public static final int ATS_CONFIG_PUBLISH_SIZE_BYTES = 10 * 1024; + + public ATSV2HistoryLoggingService() { + super(ATSV2HistoryLoggingService.class.getName()); + } + + @Override protected void serviceInit(Configuration conf) throws Exception { + boolean historyLoggingEnabled = + conf.getBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT); + if (!historyLoggingEnabled) { + LOG.info("ATSV2Service: History Logging disabled. " + + TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED + " set to false"); + return; + } + + maxTimeToWaitOnShutdown = + conf.getLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS, + TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT); + maxPollingTimeMillis = + conf.getInt(TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT, + TezConfiguration.YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT); + + if (maxTimeToWaitOnShutdown < 0) { + waitForeverOnShutdown = true; + } + + timelineServiceEnabled = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENABLED, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED); + if (!timelineServiceEnabled && + conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "") + .equals(ATSV2HistoryLoggingService.class.getName())) { + LOG.warn(ATSV2HistoryLoggingService.class.getName() + + " is disabled due to Timeline Service being disabled, " + + YarnConfiguration.TIMELINE_SERVICE_ENABLED + " set to false"); + } + super.serviceInit(conf); + } + + @Override protected void serviceStart() throws Exception { + if (!timelineServiceEnabled) { + return; + } + super.serviceStart(); + } + + private void createAndRegisterTimelineClient(TezAMRMClientAsync amRmClient) throws Exception { + if (amRmClient == null) { + LOG.warn("Cannot register TimeClient without amRmClient"); + return; + } + Configuration conf = getConfig(); + if (timelineServiceEnabled) { + timelineV2Client = + TimelineV2Client.createTimelineClient(appContext.getApplicationID()); + timelineV2Client.init(conf); + LOG.info("timelineV2Client inited."); + amRmClient.registerTimelineV2Client(timelineV2Client); + + LOG.info("timelineV2Client registered using amRmClient."); + timelineV2Client.start(); + + eventHandlingThread = new Thread(createThread()); + eventHandlingThread.setName("ATSV2HistoryEventHandlingThread"); + eventHandlingThread.start(); + + registered.set(true); + } + } + + @Override protected void serviceStop() throws Exception { + if (!timelineServiceEnabled || !registered.get()) { + return; + } + LOG.info( + "Stopping ATSV2Service" + ", eventQueueBacklog=" + eventQueue.size()); + stopped.set(true); + // if (eventHandlingThread != null) { + // eventHandlingThread.interrupt(); + // } + try { + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), + appContext.getApplicationID()); + synchronized (lock) { + if (eventHandlingThread != null) { + eventHandlingThread.interrupt(); + } + if (!eventQueue.isEmpty()) { + LOG.warn( + "ATSV2Service being stopped" + ", eventQueueBacklog=" + eventQueue + .size() + ", maxTimeLeftToFlush=" + maxTimeToWaitOnShutdown + + ", waitForever=" + waitForeverOnShutdown); + long startTime = appContext.getClock().getTime(); + long endTime = startTime + maxTimeToWaitOnShutdown; + while (waitForeverOnShutdown || (endTime >= appContext.getClock() + .getTime())) { + try { + DAGHistoryEvent event = + eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); + if (event == null) { + LOG.info("Event queue empty, stopping ATS Service"); + break; + } + try { + handleEvents(event); + } catch (Exception e) { + LOG.warn("Error handling event", e); + } + } catch (InterruptedException e) { + LOG.info("ATSService interrupted while shutting down. Exiting." + + " EventQueueBacklog=" + eventQueue.size()); + } + } + } + } + } finally { + appContext.getHadoopShim().clearHadoopCallerContext(); + } + if (!eventQueue.isEmpty()) { + LOG.warn("Did not finish flushing eventQueue before stopping ATSService" + + ", eventQueueBacklog=" + eventQueue.size()); + } + if (timelineV2Client != null) { + timelineV2Client.stop(); + } + // stop all the components + super.serviceStop(); + } + + private void handleEvents(DAGHistoryEvent event) { + TezDAGID dagID = event.getDagID(); + + if (event.getDagID() != null && skippedDAGs.contains(event.getDagID())) { + return; + } + LOG.info("createTimelineEntity called " + event.getHistoryEvent().getEventType()); + TimelineEntity timelineEntity = HistoryEventTimelineConversionAtsv2.createTimelineEntity(event.getHistoryEvent()); + + try { + // TODO identify which event need to publish in sync and async + timelineV2Client.putEntitiesAsync(timelineEntity); + } catch (IOException | YarnException e) { + LOG.error( + "Failed to publish Event " + event.getHistoryEvent().getEventType() + + " for the dag : " + dagID, e); + return; + } + + if (event.getHistoryEvent().getEventType() + .equals(HistoryEventType.DAG_SUBMITTED)) { + publishConfigsOnDagSubmittedEvent( + (DAGSubmittedEvent) event.getHistoryEvent()); + } + } + + private void publishConfigsOnDagSubmittedEvent(DAGSubmittedEvent event) { + String dagId = event.getDagID().toString(); + String entityId = TEZ_PREFIX + dagId; + String entityType = EntityTypes.TEZ_DAG_ID.name(); + TimelineEntity dagConfigEntity = new TimelineEntity(); + dagConfigEntity.setId(entityId); + dagConfigEntity.setType(entityType); + + try { + int configSize = 0; + for (Map.Entry entry : event.getConf()) { + int size = entry.getKey().length() + entry.getValue().length(); + configSize += size; + if (configSize > ATS_CONFIG_PUBLISH_SIZE_BYTES) { + if (dagConfigEntity.getConfigs().size() > 0) { + timelineV2Client.putEntitiesAsync(dagConfigEntity); + // create new entity + dagConfigEntity = new TimelineEntity(); + dagConfigEntity.setId(entityId); + dagConfigEntity.setType(entityType); + } + configSize = size; + } + dagConfigEntity.addConfig(entry.getKey(), entry.getValue()); + } + if (configSize > 0) { + LOG.info("before timelineV2Client putentities" + dagConfigEntity); + timelineV2Client.putEntities(dagConfigEntity); + LOG.info("after timelineV2Client putentities"); + } + } catch (IOException | YarnException e) { + LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " + + " for the job : " + event.getDagID(), e); + } + } + + @Override public void handle(DAGHistoryEvent event) { + if (timelineServiceEnabled) { + if (!registered.get()) { + TezAMRMClientAsync amRmClient = TezAMRMClientAsyncProvider.getAMRMClientAsync(); + if (amRmClient != null) { + try { + createAndRegisterTimelineClient(amRmClient); + } catch(Exception ex){ + LOG.error("Error while create and register timelineClient", ex); + + } + } + } + eventQueue.add(event); + } + } + + Runnable createThread() { + return new Runnable() { + @Override public void run() { + boolean interrupted = false; + TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), + appContext.getApplicationID()); + while (!stopped.get() && !Thread.currentThread().isInterrupted() + && !interrupted) { + + // Log the size of the event-queue every so often. + if (eventCounter != 0 && eventCounter % 1000 == 0) { + if (eventsProcessed != 0 && !eventQueue.isEmpty()) { + LOG.info("Event queue stats" + ", eventsProcessedSinceLastUpdate=" + + eventsProcessed + ", eventQueueSize=" + eventQueue.size()); + } + eventCounter = 0; + eventsProcessed = 0; + } else { + ++eventCounter; + } + + synchronized (lock) { + try { + DAGHistoryEvent event = + eventQueue.poll(maxPollingTimeMillis, TimeUnit.MILLISECONDS); + if (event == null) { + continue; + } + if (!isValidEvent(event)) { + continue; + } + + try { + handleEvents(event); + eventsProcessed += 1; + } catch (Exception e) { + LOG.warn("Error handling events", e); + } + } catch (InterruptedException e) { + // Finish processing events and then return + interrupted = true; + } + } + } + } + }; + } + + private boolean isValidEvent(DAGHistoryEvent event) { + HistoryEventType eventType = event.getHistoryEvent().getEventType(); + TezDAGID dagId = event.getDagID(); + + if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) { + DAGSubmittedEvent dagSubmittedEvent = + (DAGSubmittedEvent) event.getHistoryEvent(); + String dagName = dagSubmittedEvent.getDAGName(); + if ((dagName != null && dagName + .startsWith(TezConstants.TEZ_PREWARM_DAG_NAME_PREFIX)) + || (!dagSubmittedEvent.isHistoryLoggingEnabled())) { + // Skip recording pre-warm DAG events + skippedDAGs.add(dagId); + return false; + } + } + if (eventType.equals(HistoryEventType.DAG_RECOVERED)) { + DAGRecoveredEvent dagRecoveredEvent = + (DAGRecoveredEvent) event.getHistoryEvent(); + if (!dagRecoveredEvent.isHistoryLoggingEnabled()) { + skippedDAGs.add(dagRecoveredEvent.getDagID()); + return false; + } + } + if (eventType.equals(HistoryEventType.DAG_FINISHED)) { + // Remove from set to keep size small + // No more events should be seen after this point. + if (skippedDAGs.remove(dagId)) { + return false; + } + } + + if (dagId != null && skippedDAGs.contains(dagId)) { + // Skip pre-warm DAGs + return false; + } + + return true; + } + +} + + diff --git a/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversionAtsv2.java b/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversionAtsv2.java new file mode 100644 index 0000000000..794a834c7b --- /dev/null +++ b/tez-plugins/tez-yarn-timeline-history-with-atsv2/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversionAtsv2.java @@ -0,0 +1,596 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.ats; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; +import org.apache.tez.common.ATSConstants; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezUncheckedException; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; +import org.apache.tez.dag.app.web.AMWebController; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.AppLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.ContainerStoppedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGRecoveredEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.history.logging.EntityTypes; +import org.apache.tez.dag.history.utils.DAGUtils; +import org.apache.tez.dag.records.TezVertexID; + +public class HistoryEventTimelineConversionAtsv2{ + + public static final String TEZ_PREFIX = "tez_"; + + private static void validateEvent(HistoryEvent event) { + if (!event.isHistoryEvent()) { + throw new UnsupportedOperationException( + "Invalid Event, does not support history" + ", eventType=" + event + .getEventType()); + } + } + + public static TimelineEntity createTimelineEntity(HistoryEvent historyEvent) { + validateEvent(historyEvent); + + switch (historyEvent.getEventType()) { + case APP_LAUNCHED: + return createAppLaunchedEntity((AppLaunchedEvent) historyEvent); + case AM_LAUNCHED: + return createAMLaunchedEntity((AMLaunchedEvent) historyEvent); + case AM_STARTED: + return createAMStartedEntity((AMStartedEvent) historyEvent); + case CONTAINER_LAUNCHED: + return createContainerLaunchedEntity( + (ContainerLaunchedEvent) historyEvent); + case CONTAINER_STOPPED: + return createContainerStoppedEntity((ContainerStoppedEvent) historyEvent); + case DAG_SUBMITTED: + return createDAGSubmittedEntity((DAGSubmittedEvent) historyEvent); + case DAG_INITIALIZED: + return createDAGInitializedEntity((DAGInitializedEvent) historyEvent); + case DAG_STARTED: + return createDAGStartedEntity((DAGStartedEvent) historyEvent); + case DAG_FINISHED: + return createDAGFinishedEntity((DAGFinishedEvent) historyEvent); + case VERTEX_INITIALIZED: + return createVertexInitializedEntity( + (VertexInitializedEvent) historyEvent); + case VERTEX_STARTED: + return createVertexStartedEntity((VertexStartedEvent) historyEvent); + case VERTEX_FINISHED: + return createVertexFinishedEntity((VertexFinishedEvent) historyEvent); + case TASK_STARTED: + return createTaskStartedEntity((TaskStartedEvent) historyEvent); + case TASK_FINISHED: + return createTaskFinishedEntity((TaskFinishedEvent) historyEvent); + case TASK_ATTEMPT_STARTED: + return createTaskAttemptStartedEntity( + (TaskAttemptStartedEvent) historyEvent); + case TASK_ATTEMPT_FINISHED: + return createTaskAttemptFinishedEntity( + (TaskAttemptFinishedEvent) historyEvent); + case VERTEX_CONFIGURE_DONE: + return createVertexReconfigureDoneEntity( + (VertexConfigurationDoneEvent) historyEvent); + case DAG_RECOVERED: + return createDAGRecoveredEntity((DAGRecoveredEvent) historyEvent); + case VERTEX_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_STARTED: + case VERTEX_GROUP_COMMIT_FINISHED: + case DAG_COMMIT_STARTED: + throw new UnsupportedOperationException( + "Invalid Event, does not support history" + ", eventType=" + + historyEvent.getEventType()); + default: + throw new UnsupportedOperationException( + "Unhandled Event" + ", eventType=" + historyEvent.getEventType()); + } + + } + + private static TimelineEntity createAppLaunchedEntity( + AppLaunchedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getApplicationId().toString(), + EntityTypes.TEZ_APPLICATION.name(), event.getLaunchTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getLaunchTime()); + entity.addEvent(tEvent); + + entity.addInfo(ATSConstants.CONFIG, + DAGUtils.convertConfigurationToATSMap(event.getConf())); + entity.addInfo(ATSConstants.APPLICATION_ID, + event.getApplicationId().toString()); + entity.addInfo(ATSConstants.USER, event.getUser()); + if (event.getVersion() != null) { + entity.addInfo(ATSConstants.TEZ_VERSION, + DAGUtils.convertTezVersionToATSMap(event.getVersion())); + } + entity.addInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); + + + return entity; + } + + private static TimelineEntity createAMLaunchedEntity(AMLaunchedEvent event) { + TimelineEntity entity = createBaseEntity( + TEZ_PREFIX + event.getApplicationAttemptId().toString(), + EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), event.getAppSubmitTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getAppSubmitTime()); + entity.addEvent(tEvent); + entity.addInfo(ATSConstants.APP_SUBMIT_TIME, event.getAppSubmitTime()); + entity.addInfo(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + entity.addInfo(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + entity.addInfo(ATSConstants.USER, event.getUser()); + + return entity; + } + + private static TimelineEntity createAMStartedEntity(AMStartedEvent event) { + TimelineEntity entity = createBaseEntity( + TEZ_PREFIX + event.getApplicationAttemptId().toString(), + EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getStartTime()); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createContainerLaunchedEntity( + ContainerLaunchedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getContainerId().toString(), + EntityTypes.TEZ_CONTAINER_ID.name(), event.getLaunchTime()); + entity.addIsRelatedToEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + + entity.addInfo(ATSConstants.CONTAINER_ID, + event.getContainerId().toString()); + entity.setCreatedTime(event.getLaunchTime()); + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getLaunchTime()); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createContainerStoppedEntity( + ContainerStoppedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getContainerId().toString(), + EntityTypes.TEZ_CONTAINER_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getStoppedTime()); + entity.addEvent(tEvent); + + // In case, a container is stopped in a different attempt + entity.addIsRelatedToEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + entity.addInfo(ATSConstants.EXIT_STATUS, event.getExitStatus()); + entity.addInfo(ATSConstants.FINISH_TIME, event.getStoppedTime()); + + return entity; + } + + private static TimelineEntity createDAGSubmittedEntity( + DAGSubmittedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getDagID().toString(), + EntityTypes.TEZ_DAG_ID.name(), event.getSubmitTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getSubmitTime()); + entity.addEvent(tEvent); + + + entity.addIsRelatedToEntity(EntityTypes.TEZ_APPLICATION.name(), + "tez_" + event.getApplicationAttemptId().getApplicationId().toString()); + entity.addIsRelatedToEntity(EntityTypes.TEZ_APPLICATION_ATTEMPT.name(), + "tez_" + event.getApplicationAttemptId().toString()); + + + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId()) { + CallerContextProto callerContext = event.getDagPlan().getCallerContext(); + entity.addInfo(ATSConstants.CALLER_CONTEXT_ID, callerContext.getCallerId()); + entity.addInfo(ATSConstants.CALLER_CONTEXT, callerContext.getContext()); + } + + entity.addInfo(ATSConstants.APPLICATION_ID, + event.getApplicationAttemptId().getApplicationId().toString()); + entity.addInfo(ATSConstants.APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + entity.addInfo(ATSConstants.USER, event.getUser()); + entity.addInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); + entity.addInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_" + + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + entity.addInfo(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + entity.addInfo(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + if (event.getQueueName() != null) { + entity.addInfo(ATSConstants.DAG_QUEUE_NAME, event.getQueueName()); + } + + try { + entity.addInfo(ATSConstants.DAG_PLAN, + DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); + } catch (IOException e) { + throw new TezUncheckedException(e); + } + return entity; + } + + private static TimelineEntity createDAGInitializedEntity( + DAGInitializedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getDagID().toString(), + EntityTypes.TEZ_DAG_ID.name(), null); + entity.addInfo(ATSConstants.INIT_TIME, event.getInitTime()); + if (event.getVertexNameIDMap() != null) { + Map nameIdStrMap = new TreeMap(); + for (Entry entry : event.getVertexNameIDMap().entrySet()) { + nameIdStrMap.put(entry.getKey(), entry.getValue().toString()); + } + entity.addInfo(ATSConstants.VERTEX_NAME_ID_MAPPING, nameIdStrMap); + } + + return entity; + } + + private static TimelineEntity createDAGStartedEntity(DAGStartedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getDagID().toString(), + EntityTypes.TEZ_DAG_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getStartTime()); + entity.addEvent(tEvent); + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.STATUS, event.getDagState().toString()); + return entity; + } + + private static TimelineEntity createDAGFinishedEntity( + DAGFinishedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getDagID().toString(), + EntityTypes.TEZ_DAG_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getFinishTime()); + entity.addEvent(tEvent); + + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + entity.addInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + entity.addInfo(ATSConstants.STATUS, event.getState().name()); + entity.addInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + entity.addInfo(ATSConstants.COMPLETION_APPLICATION_ATTEMPT_ID, + event.getApplicationAttemptId().toString()); + + + final Map dagTaskStats = event.getDagTaskStats(); + if (dagTaskStats != null) { + for(Entry entry : dagTaskStats.entrySet()) { + entity.addInfo(entry.getKey(), entry.getValue()); + } + } + + entity.addInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + + return entity; + } + + private static TimelineEntity createVertexInitializedEntity( + VertexInitializedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getVertexID().toString(), + EntityTypes.TEZ_VERTEX_ID.name(), event.getInitedTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getInitedTime()); + entity.addEvent(tEvent); + + entity.addIsRelatedToEntity(EntityTypes.TEZ_DAG_ID.name(), + event.getVertexID().getDAGId().toString()); + entity.addInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); + entity.addInfo(ATSConstants.INIT_REQUESTED_TIME, event.getInitRequestedTime()); + entity.addInfo(ATSConstants.INIT_TIME, event.getInitedTime()); + entity.addInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + entity.addInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName()); + if (event.getServicePluginInfo() != null) { + entity.addInfo(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } + return entity; + } + + private static TimelineEntity createVertexStartedEntity( + VertexStartedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getVertexID().toString(), + EntityTypes.TEZ_VERTEX_ID.name(), null); + + TimelineEvent tEvent = craeteBaseEvent(event.getEventType().name(), + event.getStartRequestedTime()); + entity.addEvent(tEvent); + + + entity.addInfo(ATSConstants.START_REQUESTED_TIME, event.getStartRequestedTime()); + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.STATUS, event.getVertexState().toString()); + + return entity; + } + + private static TimelineEntity createVertexFinishedEntity( + VertexFinishedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getVertexID().toString(), + EntityTypes.TEZ_VERTEX_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getFinishTime()); + entity.addEvent(tEvent); + entity.addInfo(ATSConstants.VERTEX_NAME, event.getVertexName()); + entity.addInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + entity.addInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + entity.addInfo(ATSConstants.STATUS, event.getState().name()); + + entity.addInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + entity.addInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + entity.addInfo(ATSConstants.STATS, + DAGUtils.convertVertexStatsToATSMap(event.getVertexStats())); + if (event.getServicePluginInfo() != null) { + entity.addInfo(ATSConstants.SERVICE_PLUGIN, + DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo())); + } + + final Map vertexTaskStats = event.getVertexTaskStats(); + if (vertexTaskStats != null) { + for(Entry entry : vertexTaskStats.entrySet()) { + entity.addInfo(entry.getKey(), entry.getValue()); + } + } + + return entity; + } + + private static TimelineEntity createTaskStartedEntity( + TaskStartedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getTaskID().toString(), + EntityTypes.TEZ_TASK_ID.name(), event.getStartTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getStartTime()); + entity.addEvent(tEvent); + + entity.addIsRelatedToEntity(EntityTypes.TEZ_VERTEX_ID.name(), + event.getTaskID().getVertexID().toString()); + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.SCHEDULED_TIME, event.getScheduledTime()); + entity.addInfo(ATSConstants.STATUS, event.getState().name()); + + return entity; + } + + private static TimelineEntity createTaskFinishedEntity( + TaskFinishedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getTaskID().toString(), + EntityTypes.TEZ_TASK_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getFinishTime()); + entity.addEvent(tEvent); + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + entity.addInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + entity.addInfo(ATSConstants.STATUS, event.getState().name()); + entity.addInfo(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, event.getNumFailedAttempts()); + if (event.getSuccessfulAttemptID() != null) { + entity.addInfo(ATSConstants.SUCCESSFUL_ATTEMPT_ID, + event.getSuccessfulAttemptID().toString()); + } + + entity.addInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + entity.addInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getTezCounters())); + return entity; + } + + private static TimelineEntity createTaskAttemptStartedEntity( + TaskAttemptStartedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getTaskAttemptID().toString(), + EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), event.getStartTime()); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getStartTime()); + entity.addEvent(tEvent); + + entity.addIsRelatedToEntity(EntityTypes.TEZ_TASK_ID.name(), + event.getTaskAttemptID().getTaskID().toString()); + + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + if (event.getInProgressLogsUrl() != null) { + entity.addInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + entity.addInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + entity.addInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + entity.addInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + entity.addInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + entity.addInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + + + return entity; + } + + private static TimelineEntity createTaskAttemptFinishedEntity( + TaskAttemptFinishedEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getTaskAttemptID().toString(), + EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getFinishTime()); + entity.addEvent(tEvent); + + if (event.getTaskFailureType() != null) { + entity.addInfo(ATSConstants.TASK_FAILURE_TYPE, event.getTaskFailureType().name()); + } + entity.addInfo(ATSConstants.CREATION_TIME, event.getCreationTime()); + entity.addInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + entity.addInfo(ATSConstants.START_TIME, event.getStartTime()); + entity.addInfo(ATSConstants.FINISH_TIME, event.getFinishTime()); + if (event.getCreationCausalTA() != null) { + entity.addInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT, + event.getCreationCausalTA().toString()); + } + entity.addInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime())); + entity.addInfo(ATSConstants.STATUS, event.getState().name()); + if (event.getTaskAttemptError() != null) { + entity.addInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name()); + } + entity.addInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); + entity.addInfo(ATSConstants.COUNTERS, + DAGUtils.convertCountersToATSMap(event.getCounters())); + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + entity.addInfo(ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents())); + } + if (event.getNodeId() != null) { + entity.addInfo(ATSConstants.NODE_ID, event.getNodeId().toString()); + } + if (event.getContainerId() != null) { + entity.addInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); + } + if (event.getInProgressLogsUrl() != null) { + entity.addInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl()); + } + if (event.getCompletedLogsUrl() != null) { + entity.addInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl()); + } + if (event.getNodeHttpAddress() != null) { + entity.addInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); + } + + return entity; + } + + private static TimelineEntity createVertexReconfigureDoneEntity( + VertexConfigurationDoneEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getVertexID().toString(), + EntityTypes.TEZ_VERTEX_ID.name(), null); + + TimelineEvent tEvent = craeteBaseEvent(event.getEventType().name(), + event.getReconfigureDoneTime()); + //entity.addEvent(tEvent); + + Map eventInfo = new HashMap(); + if (event.getSourceEdgeProperties() != null && !event.getSourceEdgeProperties().isEmpty()) { + Map updatedEdgeManagers = new HashMap(); + for (Entry entry : + event.getSourceEdgeProperties().entrySet()) { + updatedEdgeManagers.put(entry.getKey(), + DAGUtils.convertEdgeProperty(entry.getValue())); + } + eventInfo.put(ATSConstants.UPDATED_EDGE_MANAGERS, updatedEdgeManagers); + } + eventInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks()); + tEvent.setInfo(eventInfo); + entity.addEvent(tEvent); + + entity.addInfo(ATSConstants.NUM_TASKS, event.getNumTasks()); + + return entity; + } + + private static TimelineEntity createDAGRecoveredEntity( + DAGRecoveredEvent event) { + TimelineEntity entity = + createBaseEntity(TEZ_PREFIX + event.getDagID().toString(), + EntityTypes.TEZ_DAG_ID.name(), null); + + TimelineEvent tEvent = + craeteBaseEvent(event.getEventType().name(), event.getRecoveredTime()); + entity.addEvent(tEvent); + + return entity; + } + + private static TimelineEntity createBaseEntity(String entityId, + String entityType, Long createdTime) { + TimelineEntity entity = new TimelineEntity(); + entity.setId(entityId); + entity.setType(entityType); + entity.setCreatedTime(createdTime); + return entity; + } + + private static TimelineEvent craeteBaseEvent(String eventType, + long createdTime) { + TimelineEvent tEvent = new TimelineEvent(); + tEvent.setId(eventType); + tEvent.setTimestamp(createdTime); + return tEvent; + } +}