Skip to content

Commit

Permalink
Sync master (linkedin#34)
Browse files Browse the repository at this point in the history
* Fix linkedin#162 with the right calculation for resourceswasted and  add missing workflow links (linkedin#207)

* Fix Exception thrown when JAVA_EXTRA_OPTIONS is not present (linkedin#210)

* Adds an option to fetch recently finished apps from RM (linkedin#212)

* Fixes issue caused by http in history server config property (linkedin#217)

* add config for timezone of job history server (linkedin#214)

* Include reference to the weekly meeting
  • Loading branch information
shkhrgpt authored Mar 8, 2017
1 parent f0a4a74 commit 036d197
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Google groups mailing list: [Click here](https://groups.google.com/forum/#!forum

Github issues: [click here](https://github.com/linkedin/dr-elephant/issues)

## Meetings

We have scheduled a weekly Dr. Elephant meeting for the interested developers and users to discuss future plans for Dr. Elephant. Please [click here](https://github.com/linkedin/dr-elephant/issues/209) for details.

## How to Contribute?

Check this [link](https://github.com/linkedin/dr-elephant/wiki/How-to-Contribute%3F).
Expand Down
4 changes: 4 additions & 0 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
files, but also increase the risk of OutOfMemory error. The default heap size of Dr. Elephant
is 1024MB. To increase this, e.g. to 2048MB, run this before start.sh:
export OPTS="-mem 2048"
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->
<!--
<fetcher>
Expand All @@ -53,6 +56,7 @@
<params>
<sampling_enabled>false</sampling_enabled>
<history_log_size_limit_in_mb>500</history_log_size_limit_in_mb>
<history_server_time_zone>PST</history_server_time_zone>
</params>
</fetcher>
-->
Expand Down
8 changes: 8 additions & 0 deletions app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@
<value>true</value>
<description>If this property is "false", search will only make exact matches</description>
</property>
<!--
Initial window in MS to indicate how much older apps to fetch from RM.
-->
<!--
<property>
<name>drelephant.analysis.fetch.initial.windowMillis</name>
<value>3600000</value>
</property> -->
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";

private static Configuration configuration;

// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
Expand All @@ -58,6 +60,7 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {

private String _resourceManagerAddress;
private long _lastTime = 0;
private long _fetchStartTime = 0;
private long _currentTime = 0;
private long _tokenUpdatedTime = 0;
private AuthenticatedURL.Token _token;
Expand Down Expand Up @@ -109,6 +112,12 @@ public void updateResourceManagerAddresses() {
public void configure(Configuration configuration)
throws IOException {
this.configuration = configuration;
String initialFetchWindowString = configuration.get(FETCH_INITIAL_WINDOW_MS);
if (initialFetchWindowString != null) {
long initialFetchWindow = Long.getLong(initialFetchWindowString);
_lastTime = System.currentTimeMillis() - FETCH_DELAY - initialFetchWindow;
_fetchStartTime = _lastTime;
}
updateResourceManagerAddresses();
}

Expand Down Expand Up @@ -212,7 +221,7 @@ private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationEx

// When called first time after launch, hit the DB and avoid duplicated analytic jobs that have been analyzed
// before.
if (_lastTime > 0 || (_lastTime == 0 && AppResult.find.byId(appId) == null)) {
if (_lastTime > _fetchStartTime || (_lastTime == _fetchStartTime && AppResult.find.byId(appId) == null)) {
String user = app.get("user").getValueAsText();
String name = app.get("name").getValueAsText();
String queueName = app.get("queue").getValueAsText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;

/**
* This class implements the Fetcher for MapReduce Applications on Hadoop2
Expand All @@ -57,6 +58,7 @@ public class MapReduceFSFetcherHadoop2 extends MapReduceFetcher {
private static final Logger logger = Logger.getLogger(MapReduceFSFetcherHadoop2.class);

private static final String LOG_SIZE_XML_FIELD = "history_log_size_limit_in_mb";
private static final String HISTORY_SERVER_TIME_ZONE_XML_FIELD = "history_server_time_zone";
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
private static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
protected static final double DEFALUT_MAX_LOG_SIZE_IN_MB = 500;
Expand All @@ -65,6 +67,7 @@ public class MapReduceFSFetcherHadoop2 extends MapReduceFetcher {
private String _historyLocation;
private String _intermediateHistoryLocation;
private double _maxLogSizeInMB;
private TimeZone _timeZone;

public MapReduceFSFetcherHadoop2(FetcherConfigurationData fetcherConfData) throws IOException {
super(fetcherConfData);
Expand All @@ -78,6 +81,10 @@ public MapReduceFSFetcherHadoop2(FetcherConfigurationData fetcherConfData) throw
}
logger.info("The history log limit of MapReduce application is set to " + _maxLogSizeInMB + " MB");

String timeZoneStr = fetcherConfData.getParamMap().get(HISTORY_SERVER_TIME_ZONE_XML_FIELD);
_timeZone = timeZoneStr == null ? TimeZone.getDefault() : TimeZone.getTimeZone(timeZoneStr);
logger.info("Using timezone: " + _timeZone.getID());

Configuration conf = new Configuration();
this._fs = FileSystem.get(conf);
this._historyLocation = conf.get("mapreduce.jobhistory.done-dir");
Expand All @@ -94,6 +101,10 @@ public double getMaxLogSizeInMB() {
return _maxLogSizeInMB;
}

public TimeZone getTimeZone() {
return _timeZone;
}

/**
* The location of a job history file is in format: {done-dir}/yyyy/mm/dd/{serialPart}.
* yyyy/mm/dd is the year, month and date of the finish time.
Expand All @@ -112,7 +123,7 @@ public double getMaxLogSizeInMB() {
*/
protected String getHistoryDir(AnalyticJob job) {
// generate the date part
Calendar timestamp = Calendar.getInstance();
Calendar timestamp = Calendar.getInstance(_timeZone);
timestamp.setTimeInMillis(job.getFinishTime());
String datePart = String.format(TIMESTAMP_DIR_FORMAT,
timestamp.get(Calendar.YEAR),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class SparkRestClient(sparkConf: SparkConf) {

private val historyServerUri: URI = sparkConf.getOption(HISTORY_SERVER_ADDRESS_KEY) match {
case Some(historyServerAddress) =>
val baseUri = new URI(s"http://${historyServerAddress}")
val baseUri: URI =
// Latest versions of CDH include http in their history server address configuration.
// However, it is not recommended by Spark documentation(http://spark.apache.org/docs/latest/running-on-yarn.html)
if (historyServerAddress.contains(s"http://")) {
new URI(historyServerAddress)
} else {
new URI(s"http://${historyServerAddress}")
}
require(baseUri.getPath == "")
baseUri
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.TimeZone;

public class MapReduceFSFetcherHadoop2Test {

Expand Down Expand Up @@ -77,6 +78,7 @@ public void testFetcherDefaultConfig() {
fetcherConf.getFetchersConfigurationData().get(0));
Assert.assertFalse("Sampling should be disabled in default", fetcher.isSamplingEnabled());
Assert.assertEquals(fetcher.DEFALUT_MAX_LOG_SIZE_IN_MB, fetcher.getMaxLogSizeInMB(), 0.0001);
Assert.assertEquals(TimeZone.getDefault(), fetcher.getTimeZone());

List<Object> list = new ArrayList<Object>();
int listLen = fetcher.MAX_SAMPLE_SIZE * 2;
Expand All @@ -98,6 +100,7 @@ public void testFetcherConfig() {
fetcherConf.getFetchersConfigurationData().get(0));
Assert.assertTrue("Failed to enable sampling", fetcher.isSamplingEnabled());
Assert.assertEquals(200d, fetcher.getMaxLogSizeInMB(), 0.0001);
Assert.assertEquals(TimeZone.getTimeZone("PST"), fetcher.getTimeZone());

List<Object> list = new ArrayList<Object>();
int listLen = fetcher.MAX_SAMPLE_SIZE * 2;
Expand All @@ -119,6 +122,7 @@ public void testFetcherEmptyConf() {
fetcherConf.getFetchersConfigurationData().get(0));
Assert.assertFalse("Sampling should be disabled in default", fetcher.isSamplingEnabled());
Assert.assertEquals(fetcher.DEFALUT_MAX_LOG_SIZE_IN_MB, fetcher.getMaxLogSizeInMB(), 0.0001);
Assert.assertEquals(TimeZone.getDefault(), fetcher.getTimeZone());

List<Object> list = new ArrayList<Object>();
int listLen = fetcher.MAX_SAMPLE_SIZE * 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,40 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers {
assertion
}
}

it("returns the desired data from the Spark REST API for cluster mode application when http in jobhistory address") {
import ExecutionContext.Implicits.global
val fakeJerseyServer = new FakeJerseyServer() {
override def configure(): Application = super.configure() match {
case resourceConfig: ResourceConfig =>
resourceConfig
.register(classOf[FetchClusterModeDataFixtures.ApiResource])
.register(classOf[FetchClusterModeDataFixtures.ApplicationResource])
.register(classOf[FetchClusterModeDataFixtures.JobsResource])
.register(classOf[FetchClusterModeDataFixtures.StagesResource])
.register(classOf[FetchClusterModeDataFixtures.ExecutorsResource])
case config => config
}
}

fakeJerseyServer.setUp()

val historyServerUri = fakeJerseyServer.target.getUri

val sparkConf = new SparkConf().set("spark.yarn.historyServer.address", s"http://${historyServerUri.getHost}:${historyServerUri.getPort}")
val sparkRestClient = new SparkRestClient(sparkConf)

sparkRestClient.fetchData(FetchClusterModeDataFixtures.APP_ID) map { restDerivedData =>
restDerivedData.applicationInfo.id should be(FetchClusterModeDataFixtures.APP_ID)
restDerivedData.applicationInfo.name should be(FetchClusterModeDataFixtures.APP_NAME)
restDerivedData.jobDatas should not be(None)
restDerivedData.stageDatas should not be(None)
restDerivedData.executorSummaries should not be(None)
} andThen { case assertion: Try[Assertion] =>
fakeJerseyServer.tearDown()
assertion
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<params>
<sampling_enabled>true</sampling_enabled>
<history_log_size_limit_in_mb>200</history_log_size_limit_in_mb>
<history_server_time_zone>PST</history_server_time_zone>
</params>
</fetcher>
</fetchers>

0 comments on commit 036d197

Please sign in to comment.