Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing Spark flow and job info. #198

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions app/com/linkedin/drelephant/util/InfoExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfiguration;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.exceptions.WorkflowClient;
import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;
import com.linkedin.drelephant.schedulers.Scheduler;
import com.linkedin.drelephant.spark.data.SparkApplicationData;

import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;

import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.w3c.dom.Document;

import models.AppResult;

import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;


/**
* InfoExtractor is responsible for retrieving information and context about a
* job from the job's configuration
Expand Down Expand Up @@ -106,55 +106,52 @@ public static Scheduler getSchedulerInstance(String appId, Properties properties
* @param data The Hadoop application data
*/
public static void loadInfo(AppResult result, HadoopApplicationData data) {
Properties properties = new Properties();
if (data instanceof MapReduceApplicationData) {
properties = retrieveMapreduceProperties((MapReduceApplicationData) data);
}

Properties properties = data.getConf();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #162, getConf should be sufficient for both MapReduceApplicationData and SparkApplicationData.

Scheduler scheduler = getSchedulerInstance(data.getAppId(), properties);

if (scheduler != null) {
String appId = data.getAppId();

// Load all the Ids
result.jobDefId = Utils.truncateField(scheduler.getJobDefId(), AppResult.URL_LEN_LIMIT, appId);
result.jobExecId = Utils.truncateField(scheduler.getJobExecId(), AppResult.URL_LEN_LIMIT, appId);
result.flowDefId = Utils.truncateField(scheduler.getFlowDefId(), AppResult.URL_LEN_LIMIT, appId);
result.flowExecId = Utils.truncateField(scheduler.getFlowExecId(), AppResult.FLOW_EXEC_ID_LIMIT, appId);

// Dr. Elephant expects all the 4 ids(jobDefId, jobExecId, flowDefId, flowExecId) to be set.
if (!Utils.isSet(result.jobDefId) || !Utils.isSet(result.jobExecId) || !Utils.isSet(result.flowDefId) || !Utils
.isSet(result.flowExecId)) {
logger.warn("This job doesn't have the correct " + scheduler.getSchedulerName() + " integration support. I"
+ " will treat this as an adhoc job");
loadNoSchedulerInfo(result);
} else {
result.scheduler = Utils.truncateField(scheduler.getSchedulerName(), AppResult.SCHEDULER_LIMIT, appId);
result.workflowDepth = scheduler.getWorkflowDepth();
result.jobName = scheduler.getJobName() != null ? Utils
.truncateField(scheduler.getJobName(), AppResult.JOB_NAME_LIMIT, appId) : "";
result.jobDefUrl = scheduler.getJobDefUrl() != null ? Utils
.truncateField(scheduler.getJobDefUrl(), AppResult.URL_LEN_LIMIT, appId) : "";
result.jobExecUrl = scheduler.getJobExecUrl() != null ? Utils
.truncateField(scheduler.getJobExecUrl(), AppResult.URL_LEN_LIMIT, appId) : "";
result.flowDefUrl = scheduler.getFlowDefUrl() != null ? Utils
.truncateField(scheduler.getFlowDefUrl(), AppResult.URL_LEN_LIMIT, appId) : "";
result.flowExecUrl = scheduler.getFlowExecUrl() != null ? Utils
.truncateField(scheduler.getFlowExecUrl(), AppResult.URL_LEN_LIMIT, appId) : "";
}
} else {
if (scheduler == null) {
loadNoSchedulerInfo(result);
} else if (StringUtils.isEmpty(scheduler.getJobDefId()) || StringUtils.isEmpty(scheduler.getJobExecId())
|| StringUtils.isEmpty(scheduler.getFlowDefId()) || StringUtils.isEmpty(scheduler.getFlowExecId())) {
logger.warn("This job doesn't have the correct " + scheduler.getSchedulerName() + " integration support. I"
+ " will treat this as an adhoc job");
loadNoSchedulerInfo(result);
} else {
loadSchedulerInfo(result, data, scheduler);
}
}

/**
* Retrieve the Mapreduce properties
* Populates the given app result with the info from the given application data and scheduler.
*
* @param appData the Mapreduce Application Data
* @return the retrieved mapreduce properties
* @param result the AppResult to populate
* @param data the HadoopApplicationData to use when populating the result
* @param scheduler the Scheduler to use when populating the result
*/
public static Properties retrieveMapreduceProperties(MapReduceApplicationData appData) {
return appData.getConf();
public static void loadSchedulerInfo(AppResult result, HadoopApplicationData data, Scheduler scheduler) {
String appId = data.getAppId();

result.scheduler = Utils.truncateField(scheduler.getSchedulerName(), AppResult.SCHEDULER_LIMIT, appId);
result.workflowDepth = scheduler.getWorkflowDepth();

result.jobName = scheduler.getJobName() != null ? Utils
.truncateField(scheduler.getJobName(), AppResult.JOB_NAME_LIMIT, appId) : "";

result.jobDefId = Utils.truncateField(scheduler.getJobDefId(), AppResult.URL_LEN_LIMIT, appId);
result.jobDefUrl = scheduler.getJobDefUrl() != null ? Utils
.truncateField(scheduler.getJobDefUrl(), AppResult.URL_LEN_LIMIT, appId) : "";

result.jobExecId = Utils.truncateField(scheduler.getJobExecId(), AppResult.URL_LEN_LIMIT, appId);
result.jobExecUrl = scheduler.getJobExecUrl() != null ? Utils
.truncateField(scheduler.getJobExecUrl(), AppResult.URL_LEN_LIMIT, appId) : "";

result.flowDefId = Utils.truncateField(scheduler.getFlowDefId(), AppResult.URL_LEN_LIMIT, appId);
result.flowDefUrl = scheduler.getFlowDefUrl() != null ? Utils
.truncateField(scheduler.getFlowDefUrl(), AppResult.URL_LEN_LIMIT, appId) : "";

result.flowExecId = Utils.truncateField(scheduler.getFlowExecId(), AppResult.FLOW_EXEC_ID_LIMIT, appId);
result.flowExecUrl = scheduler.getFlowExecUrl() != null ? Utils
.truncateField(scheduler.getFlowExecUrl(), AppResult.URL_LEN_LIMIT, appId) : "";
}

/**
Expand Down
5 changes: 3 additions & 2 deletions app/com/linkedin/drelephant/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,12 @@ public static Map<String, String> parseCsKeyValue(String str) {
*
* @param field the field to br truncated
* @param limit the truncation limit
* @param context additional context for logging purposes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why truncateField was taking an appId, then I saw what was going on here. Hopefully this change helps.

* @return The truncated field
*/
public static String truncateField(String field, int limit, String appId) {
public static String truncateField(String field, int limit, String context) {
if (field != null && limit > TRUNCATE_SUFFIX.length() && field.length() > limit) {
logger.info("Truncating " + field + " to " + limit + " characters for " + appId);
logger.info("Truncating " + field + " to " + limit + " characters for " + context);
field = field.substring(0, limit - 3) + "...";
}
return field;
Expand Down
65 changes: 64 additions & 1 deletion test/com/linkedin/drelephant/util/InfoExtractorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package com.linkedin.drelephant.util;

import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.schedulers.AirflowScheduler;
import com.linkedin.drelephant.schedulers.AzkabanScheduler;
import com.linkedin.drelephant.schedulers.OozieScheduler;
import com.linkedin.drelephant.schedulers.Scheduler;

import java.util.Properties;
import models.AppResult;

import org.junit.After;
import org.junit.Before;
Expand All @@ -31,14 +35,15 @@
import mockit.Expectations;
import mockit.Mocked;
import mockit.integration.junit4.JMockit;
import org.apache.commons.lang.StringUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;

import play.test.FakeApplication;
import play.test.Helpers;

import static org.junit.Assert.assertEquals;

import static org.junit.Assert.assertFalse;

@RunWith(JMockit.class)
public class InfoExtractorTest {
Expand Down Expand Up @@ -147,4 +152,62 @@ public void testGetSchedulerInstanceNull() {
assertEquals(null, scheduler);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertNull would be more obvious, I think, but I tried to limit the scope of the changes in this PR.

}

@Test
public void testLoadSchedulerInfo() {
Properties properties = new Properties();
properties.put(AzkabanScheduler.AZKABAN_JOB_URL,
"https://grid.example.com:9000/manager?project=project-name&flow=flow-name&job=job-name");
properties.put(AzkabanScheduler.AZKABAN_ATTEMPT_URL,
"https://grid.example.com:9000/executor?execid=123456&job=job-name&attempt=0");
properties.put(AzkabanScheduler.AZKABAN_WORKFLOW_URL,
"https://grid.example.com:9000/manager?project=project-name&flow=flow-name");
properties.put(AzkabanScheduler.AZKABAN_EXECUTION_URL,
"https://grid.example.com:9000/executor?execid=123456");
properties.put(AzkabanScheduler.AZKABAN_JOB_NAME, "job-name");

SchedulerConfigurationData schedulerConfigurationData = new SchedulerConfigurationData("azkaban", null, null);

Scheduler scheduler = new AzkabanScheduler("id", properties, schedulerConfigurationData);

AppResult result = new AppResult();

HadoopApplicationData data =
new HadoopApplicationData() {
String appId = "application_5678";
Properties conf = new Properties();
ApplicationType applicationType = new ApplicationType("foo");

@Override
public String getAppId() {
return appId;
}

@Override
public Properties getConf() {
return conf;
}

@Override
public ApplicationType getApplicationType() {
return applicationType;
}

@Override
public boolean isEmpty() {
return false;
}
};

InfoExtractor.loadSchedulerInfo(result, data, scheduler);

assertEquals(result.scheduler, "azkaban");
assertFalse(StringUtils.isEmpty(result.getJobExecId()));
assertFalse(StringUtils.isEmpty(result.getJobDefId()));
assertFalse(StringUtils.isEmpty(result.getFlowExecId()));
assertFalse(StringUtils.isEmpty(result.getFlowDefId()));
assertFalse(StringUtils.isEmpty(result.getJobExecUrl()));
assertFalse(StringUtils.isEmpty(result.getJobDefUrl()));
assertFalse(StringUtils.isEmpty(result.getFlowExecUrl()));
assertFalse(StringUtils.isEmpty(result.getFlowDefUrl()));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's probably a Hamcrest matcher we could use here, but combining assertFalse with StringUtils.isEmpty seems to be good enough.

}