diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java index 9e964e1381fe5..0245eb33e7120 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java @@ -102,6 +102,17 @@ public class JHAdminConfig { MR_HISTORY_PREFIX + "intermediate-user-done-dir.permissions"; public static final short DEFAULT_MR_HISTORY_INTERMEDIATE_USER_DONE_DIR_PERMISSIONS = 0770; + + /** + * Scan for history files to read from done dir every X ms. + */ + public static final String MR_HISTORY_READ_ONLY_INTERVAL_MS = + MR_HISTORY_PREFIX + "read-only.interval-ms"; + public static final long DEFAULT_MR_HISTORY_READ_ONLY_INTERVAL_MS = + 60 * 1000l; //60 seconds + /** Setting to check path pattern for history files in done directory */ + public static final String MR_HISTORY_READ_ONLY_DIR_PATTERN = + MR_HISTORY_PREFIX + "read-only.dir-pattern"; /** Size of the job list cache.*/ public static final String MR_HISTORY_JOBLIST_CACHE_SIZE = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 825fb259dc9df..6a4b01a4cd936 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -21,7 +21,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.ConnectException; +import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -47,6 +49,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; @@ -531,7 +534,7 @@ private boolean isOversized() { } public synchronized void waitUntilMoved() { - while (isMovePending() && !didMoveFail()) { + while (!isReadOnlyMode() && isMovePending() && !didMoveFail()) { try { wait(); } catch (InterruptedException e) { @@ -578,6 +581,9 @@ public synchronized void waitUntilMoved() { */ private int maxTasksForLoadedJob = -1; + private String readOnlyDirectoryPattern; + private FileSystem readOnlyDirFs; + public HistoryFileManager() { super(HistoryFileManager.class.getName()); } @@ -594,7 +600,12 @@ protected void serviceInit(Configuration conf) throws Exception { long maxFSWaitTime = conf.getLong( JHAdminConfig.MR_HISTORY_MAX_START_WAIT_TIME, JHAdminConfig.DEFAULT_MR_HISTORY_MAX_START_WAIT_TIME); - createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); + + readOnlyDirectoryPattern = conf.get(JHAdminConfig.MR_HISTORY_READ_ONLY_DIR_PATTERN, ""); + + if (!isReadOnlyMode()) { + createHistoryDirs(SystemClock.getInstance(), 10 * 1000, maxFSWaitTime); + } maxTasksForLoadedJob = conf.getInt( JHAdminConfig.MR_HS_LOADED_JOBS_TASKS_MAX, @@ -611,10 +622,13 @@ protected void serviceInit(Configuration conf) throws Exception { JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); - int numMoveThreads = conf.getInt( - JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, - JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); - moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); + if (!isReadOnlyMode()) { + int numMoveThreads = conf.getInt( + JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, + JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); + moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads); + } + super.serviceInit(conf); } @@ -782,6 +796,14 @@ protected HistoryFileInfo createHistoryFileInfo(Path historyFile, @SuppressWarnings("unchecked") void initExisting() throws IOException { LOG.info("Initializing Existing Jobs..."); + if (isReadOnlyMode()) { + Path readOnlyPath = new Path(readOnlyDirectoryPattern); + readOnlyDirFs = readOnlyPath.getFileSystem(conf); + scanDoneDirectoryInit(); + + return; + } + List timestampedDirList = findTimestampedDirectories(); // Sort first just so insertion is in a consistent order Collections.sort(timestampedDirList); @@ -894,6 +916,10 @@ protected List scanDirectoryForHistoryFiles(Path path, FileContext fc) throws IOException { return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); } + + protected boolean isReadOnlyMode() { + return !(readOnlyDirectoryPattern.isEmpty()); + } /** * Finds all history directories with a timestamp component by scanning the @@ -907,6 +933,74 @@ protected List findTimestampedDirectories() throws IOException { return fsList; } + /** + * Scans the file system to find history files for jobs executed yesterday, today and the next day + * + * @throws IOException + * if there was a error while scanning + */ + void scanDoneDirectoryUpdate() throws IOException { + ZonedDateTime today = ZonedDateTime.now(); + ZonedDateTime previousDay = today.minusDays(1); + ZonedDateTime nextDay = today.plusDays(1); + + StringBuilder scanDatesPattern = new StringBuilder("{") + .append(JobHistoryUtils.timestampDirectoryComponent(previousDay.toInstant().toEpochMilli())) + .append(",") + .append(JobHistoryUtils.timestampDirectoryComponent(today.toInstant().toEpochMilli())) + .append(",") + .append(JobHistoryUtils.timestampDirectoryComponent(nextDay.toInstant().toEpochMilli())) + .append("}"); + + scanDoneDirectory(new Path(readOnlyDirectoryPattern, scanDatesPattern + "/*/*")); + } + + /** + * Scans the file system to find all the existing history files + * + * @throws IOException + * if there was a error while scanning + */ + void scanDoneDirectoryInit() throws IOException { + scanDoneDirectory(new Path(readOnlyDirectoryPattern, "*/*/*/*/*")); + } + + private void scanDoneDirectory(Path pathPattern) throws IOException { + + List fileStatusList = Arrays.asList( + readOnlyDirFs.globStatus(pathPattern, JobHistoryUtils.getHistoryFileFilter())); + + if (LOG.isDebugEnabled()) { + LOG.debug("Found " + fileStatusList.size() + " files"); + } + + for (FileStatus fs : fileStatusList) { + if (LOG.isDebugEnabled()) { + LOG.debug("scanning file: "+ fs.getPath()); + } + + try { + JobIndexInfo jobIndexInfo = + FileNameIndexUtils.getIndexInfo(fs.getPath().getName()); + String confFileName = + JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId()); + String summaryFileName = + JobHistoryUtils.getIntermediateSummaryFileName(jobIndexInfo.getJobId()); + HistoryFileInfo fileInfo = + createHistoryFileInfo( + fs.getPath(), + new Path(fs.getPath().getParent(), confFileName), + new Path(fs.getPath().getParent(), summaryFileName), + jobIndexInfo, + false); + + jobListCache.addIfAbsent(fileInfo); + } catch (Exception ex) { + LOG.error("Found exception while looking for history files in " + fs.getPath(), ex); + } + } + } + /** * Scans the intermediate directory to find user directories. Scans these for * history files if the modification time for the directory has changed. Once @@ -1061,7 +1155,12 @@ private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { } public Collection getAllFileInfo() throws IOException { - scanIntermediateDirectory(); + if (isReadOnlyMode()) { + scanDoneDirectoryUpdate(); + } else { + scanIntermediateDirectory(); + } + return jobListCache.values(); } @@ -1071,8 +1170,12 @@ public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { if (fileInfo != null) { return fileInfo; } - // OK so scan the intermediate to be sure we did not lose it that way - scanIntermediateDirectory(); + if (isReadOnlyMode()) { + scanDoneDirectoryUpdate(); + } else { + // OK so scan the intermediate to be sure we did not lose it that way + scanIntermediateDirectory(); + } fileInfo = jobListCache.get(jobId); if (fileInfo != null) { return fileInfo; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index a7d1370b168b7..e392fc6cdfe43 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -70,6 +70,8 @@ public class JobHistory extends AbstractService implements HistoryContext { // Time interval for the move thread. private long moveThreadInterval; + private long readOnlyModeScanInterval; + private Configuration conf; private ScheduledThreadPoolExecutor scheduledExecutor = null; @@ -93,6 +95,10 @@ protected void serviceInit(Configuration conf) throws Exception { JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); + readOnlyModeScanInterval = conf.getLong( + JHAdminConfig.MR_HISTORY_READ_ONLY_INTERVAL_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_READ_ONLY_INTERVAL_MS); + hsManager = createHistoryFileManager(); hsManager.init(conf); try { @@ -132,11 +138,17 @@ protected void serviceStart() throws Exception { new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d") .build()); - scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(), - moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS); + if (hsManager.isReadOnlyMode()) { + scheduledExecutor.scheduleAtFixedRate(new ScanDoneDirectoryRunnable(), + readOnlyModeScanInterval, readOnlyModeScanInterval, TimeUnit.MILLISECONDS); + } else { + scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(), + moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS); + + // Start historyCleaner + scheduleHistoryCleaner(); + } - // Start historyCleaner - scheduleHistoryCleaner(); super.serviceStart(); } @@ -195,6 +207,19 @@ public void run() { } } } + + private class ScanDoneDirectoryRunnable implements Runnable { + @Override + public void run() { + try { + LOG.info("Starting scan of history files in done directory"); + hsManager.scanDoneDirectoryUpdate(); + LOG.info("Completed scan of history files in done directory"); + } catch (IOException e) { + LOG.error("Error while scanning done dir ", e); + } + } + } private class HistoryCleaner implements Runnable { public void run() {