Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ public class JHAdminConfig {
MR_HISTORY_PREFIX + "intermediate-user-done-dir.permissions";
public static final short
DEFAULT_MR_HISTORY_INTERMEDIATE_USER_DONE_DIR_PERMISSIONS = 0770;

/**
* Scan for history files to read from done dir every X ms.
*/
public static final String MR_HISTORY_READ_ONLY_INTERVAL_MS =
MR_HISTORY_PREFIX + "read-only.interval-ms";
public static final long DEFAULT_MR_HISTORY_READ_ONLY_INTERVAL_MS =
60 * 1000l; //60 seconds
/** Setting to check path pattern for history files in done directory */
public static final String MR_HISTORY_READ_ONLY_DIR_PATTERN =
MR_HISTORY_PREFIX + "read-only.dir-pattern";

/** Size of the job list cache.*/
public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -47,6 +49,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
Expand Down Expand Up @@ -531,7 +534,7 @@ private boolean isOversized() {
}

public synchronized void waitUntilMoved() {
while (isMovePending() && !didMoveFail()) {
while (!isReadOnlyMode() && isMovePending() && !didMoveFail()) {
try {
wait();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -578,6 +581,9 @@ public synchronized void waitUntilMoved() {
*/
private int maxTasksForLoadedJob = -1;

private String readOnlyDirectoryPattern;
private FileSystem readOnlyDirFs;

public HistoryFileManager() {
super(HistoryFileManager.class.getName());
}
Expand All @@ -594,7 +600,12 @@ protected void serviceInit(Configuration conf) throws Exception {
long maxFSWaitTime = conf.getLong(
JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME,
JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME);
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);

readOnlyDirectoryPattern = conf.get(JHAdminConfig.MR_HISTORY_READ_ONLY_DIR_PATTERN, "");

if (!isReadOnlyMode()) {
createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime);
}

maxTasksForLoadedJob = conf.getInt(
JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX,
Expand All @@ -611,10 +622,13 @@ protected void serviceInit(Configuration conf) throws Exception {
JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE));

int numMoveThreads = conf.getInt(
JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
if (!isReadOnlyMode()) {
int numMoveThreads = conf.getInt(
JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
}

super.serviceInit(conf);
}

Expand Down Expand Up @@ -782,6 +796,14 @@ protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
@SuppressWarnings("unchecked")
void initExisting() throws IOException {
LOG.info("Initializing Existing Jobs...");
if (isReadOnlyMode()) {
Path readOnlyPath = new Path(readOnlyDirectoryPattern);
readOnlyDirFs = readOnlyPath.getFileSystem(conf);
scanDoneDirectoryInit();

return;
}

List<FileStatus> timestampedDirList = findTimestampedDirectories();
// Sort first just so insertion is in a consistent order
Collections.sort(timestampedDirList);
Expand Down Expand Up @@ -894,6 +916,10 @@ protected List<FileStatus> scanDirectoryForHistoryFiles(Path path,
FileContext fc) throws IOException {
return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
}

protected boolean isReadOnlyMode() {
return !(readOnlyDirectoryPattern.isEmpty());
}

/**
* Finds all history directories with a timestamp component by scanning the
Expand All @@ -907,6 +933,74 @@ protected List<FileStatus> findTimestampedDirectories() throws IOException {
return fsList;
}

/**
* Scans the file system to find history files for jobs executed yesterday, today and the next day
*
* @throws IOException
* if there was a error while scanning
*/
void scanDoneDirectoryUpdate() throws IOException {
ZonedDateTime today = ZonedDateTime.now();
ZonedDateTime previousDay = today.minusDays(1);
ZonedDateTime nextDay = today.plusDays(1);

StringBuilder scanDatesPattern = new StringBuilder("{")
.append(JobHistoryUtils.timestampDirectoryComponent(previousDay.toInstant().toEpochMilli()))
.append(",")
.append(JobHistoryUtils.timestampDirectoryComponent(today.toInstant().toEpochMilli()))
.append(",")
.append(JobHistoryUtils.timestampDirectoryComponent(nextDay.toInstant().toEpochMilli()))
.append("}");

scanDoneDirectory(new Path(readOnlyDirectoryPattern, scanDatesPattern + "/*/*"));
}

/**
* Scans the file system to find all the existing history files
*
* @throws IOException
* if there was a error while scanning
*/
void scanDoneDirectoryInit() throws IOException {
scanDoneDirectory(new Path(readOnlyDirectoryPattern, "*/*/*/*/*"));
}

private void scanDoneDirectory(Path pathPattern) throws IOException {

List<FileStatus> fileStatusList = Arrays.asList(
readOnlyDirFs.globStatus(pathPattern, JobHistoryUtils.getHistoryFileFilter()));

if (LOG.isDebugEnabled()) {
LOG.debug("Found " + fileStatusList.size() + " files");
}

for (FileStatus fs : fileStatusList) {
if (LOG.isDebugEnabled()) {
LOG.debug("scanning file: "+ fs.getPath());
}

try {
JobIndexInfo jobIndexInfo =
FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
String confFileName =
JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
String summaryFileName =
JobHistoryUtils.getIntermediateSummaryFileName(jobIndexInfo.getJobId());
HistoryFileInfo fileInfo =
createHistoryFileInfo(
fs.getPath(),
new Path(fs.getPath().getParent(), confFileName),
new Path(fs.getPath().getParent(), summaryFileName),
jobIndexInfo,
false);

jobListCache.addIfAbsent(fileInfo);
} catch (Exception ex) {
LOG.error("Found exception while looking for history files in " + fs.getPath(), ex);
}
}
}

/**
* Scans the intermediate directory to find user directories. Scans these for
* history files if the modification time for the directory has changed. Once
Expand Down Expand Up @@ -1061,7 +1155,12 @@ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException {
}

public Collection<HistoryFileInfo> getAllFileInfo() throws IOException {
scanIntermediateDirectory();
if (isReadOnlyMode()) {
scanDoneDirectoryUpdate();
} else {
scanIntermediateDirectory();
}

return jobListCache.values();
}

Expand All @@ -1071,8 +1170,12 @@ public HistoryFileInfo getFileInfo(JobId jobId) throws IOException {
if (fileInfo != null) {
return fileInfo;
}
// OK so scan the intermediate to be sure we did not lose it that way
scanIntermediateDirectory();
if (isReadOnlyMode()) {
scanDoneDirectoryUpdate();
} else {
// OK so scan the intermediate to be sure we did not lose it that way
scanIntermediateDirectory();
}
fileInfo = jobListCache.get(jobId);
if (fileInfo != null) {
return fileInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class JobHistory extends AbstractService implements HistoryContext {
// Time interval for the move thread.
private long moveThreadInterval;

private long readOnlyModeScanInterval;

private Configuration conf;

private ScheduledThreadPoolExecutor scheduledExecutor = null;
Expand All @@ -93,6 +95,10 @@ protected void serviceInit(Configuration conf) throws Exception {
JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);

readOnlyModeScanInterval = conf.getLong(
JHAdminConfig.MR_HISTORY_READ_ONLY_INTERVAL_MS,
JHAdminConfig.DEFAULT_MR_HISTORY_READ_ONLY_INTERVAL_MS);

hsManager = createHistoryFileManager();
hsManager.init(conf);
try {
Expand Down Expand Up @@ -132,11 +138,17 @@ protected void serviceStart() throws Exception {
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
.build());

scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);
if (hsManager.isReadOnlyMode()) {
scheduledExecutor.scheduleAtFixedRate(new ScanDoneDirectoryRunnable(),
readOnlyModeScanInterval, readOnlyModeScanInterval, TimeUnit.MILLISECONDS);
} else {
scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(),
moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS);

// Start historyCleaner
scheduleHistoryCleaner();
}

// Start historyCleaner
scheduleHistoryCleaner();
super.serviceStart();
}

Expand Down Expand Up @@ -195,6 +207,19 @@ public void run() {
}
}
}

private class ScanDoneDirectoryRunnable implements Runnable {
@Override
public void run() {
try {
LOG.info("Starting scan of history files in done directory");
hsManager.scanDoneDirectoryUpdate();
LOG.info("Completed scan of history files in done directory");
} catch (IOException e) {
LOG.error("Error while scanning done dir ", e);
}
}
}

private class HistoryCleaner implements Runnable {
public void run() {
Expand Down