Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
Expand All @@ -32,7 +31,6 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
Expand All @@ -41,14 +39,14 @@
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.*;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.*;
Expand Down Expand Up @@ -242,19 +240,7 @@ protected void serviceInit(Configuration conf) throws Exception {
IOUtils.cleanupWithLogger(LOG, localFS);
}
LOG.info("Using leveldb path " + dbPath);
try {
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe) {
File dbFile = new File(dbPath.toString());
File backupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ backupPath, ioe);
FileUtils.copyDirectory(dbFile, backupPath);
LOG.warn("Going to try repair");
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
db = LeveldbUtils.loadOrRepairLevelDb(factory, dbPath, options);
checkVersion();
startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -62,6 +61,7 @@
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;

Expand Down Expand Up @@ -199,6 +199,11 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
static final String STARTTIME = "starttime-ldb";
static final String OWNER = "owner-ldb";

@VisibleForTesting
//Extension to FILENAME where backup will be stored in case we need to
//call LevelDb recovery
static final String BACKUP_EXT = ".backup-";

private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
Expand Down Expand Up @@ -240,6 +245,12 @@ public RollingLevelDBTimelineStore() {
super(RollingLevelDBTimelineStore.class.getName());
}

private JniDBFactory factory;
@VisibleForTesting
void setFactory(JniDBFactory fact) {
this.factory = fact;
}

@Override
@SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception {
Expand Down Expand Up @@ -284,7 +295,9 @@ protected void serviceInit(Configuration conf) throws Exception {
options.cacheSize(conf.getLong(
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
JniDBFactory factory = new JniDBFactory();
if(factory == null) {
factory = new JniDBFactory();
}
Path dbPath = new Path(
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
Path domainDBPath = new Path(dbPath, DOMAIN);
Expand Down Expand Up @@ -327,13 +340,13 @@ protected void serviceInit(Configuration conf) throws Exception {
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
LOG.info("Using leveldb path " + dbPath);
domaindb = factory.open(new File(domainDBPath.toString()), options);
domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
entitydb = new RollingLevelDB(ENTITY);
entitydb.init(conf);
indexdb = new RollingLevelDB(INDEX);
indexdb.init(conf);
starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
ownerdb = factory.open(new File(ownerDBPath.toString()), options);
starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
checkVersion();
startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
getStartTimeWriteCacheSize(conf)));
Expand All @@ -346,7 +359,7 @@ protected void serviceInit(Configuration conf) throws Exception {

super.serviceInit(conf);
}

@Override
protected void serviceStart() throws Exception {
if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
Expand Down Expand Up @@ -1816,4 +1829,4 @@ private static TimelineDomain getTimelineDomain(DBIterator iterator,
return domain;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,30 @@
package org.apache.hadoop.yarn.server.timeline.util;


import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.Time;

import java.io.File;
import java.io.IOException;

import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;

public class LeveldbUtils {

private static final String BACKUP_EXT = ".backup-";
private static final Logger LOG = LoggerFactory
.getLogger(LeveldbUtils.class);

/** A string builder utility for building timeline server leveldb keys. */
public static class KeyBuilder {
/** Maximum subkeys that can be added to construct a key. */
Expand Down Expand Up @@ -184,4 +198,22 @@ public static boolean prefixMatches(byte[] prefix, int prefixlen,
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700);

public static DB loadOrRepairLevelDb(JniDBFactory factory, Path dbPath, Options options)
throws IOException {
DB db;
try{
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe){
File dbFile = new File(dbPath.toString());
File dbBackupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ dbBackupPath, ioe);
FileUtils.copyDirectory(dbFile, dbBackupPath);
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
return db;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.junit.Assert.assertNotNull;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;

import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -38,11 +40,15 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;

import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.Options;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.eclipse.jetty.util.log.Log;
import org.mockito.Mockito;

/** Test class to verify RollingLevelDBTimelineStore. */
@InterfaceAudience.Private
Expand Down Expand Up @@ -417,11 +423,41 @@ public void testStorePerformance() throws IOException {
Log.getLog().info("Duration for " + num + ": " + duration);
}

@Test
/**
* Test that RollingLevelDb repair is attempted at least once during
* serviceInit for RollingLeveldbTimelineStore in case open fails the
* first time.
*/ public void testLevelDbRepair() throws IOException {
RollingLevelDBTimelineStore store = new RollingLevelDBTimelineStore();
JniDBFactory factory = Mockito.mock(JniDBFactory.class);
Mockito.when(factory.open(Mockito.any(File.class), Mockito.any(Options.class)))
.thenThrow(new IOException()).thenCallRealMethod();
store.setFactory(factory);

//Create the LevelDb in a different location
File path = new File("target", this.getClass().getSimpleName() + "-tmpDir2").getAbsoluteFile();
Configuration conf = new Configuration(this.config);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, path.getAbsolutePath());
try {
store.init(conf);
Mockito.verify(factory, Mockito.times(1))
.repair(Mockito.any(File.class), Mockito.any(Options.class));
FilenameFilter fileFilter =
new WildcardFileFilter("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*");
Assert.assertTrue(new File(path.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME)
.list(fileFilter).length > 0);
} finally {
store.close();
fsContext.delete(new Path(path.getAbsolutePath()), true);
}
}

public static void main(String[] args) throws Exception {
TestRollingLevelDBTimelineStore store =
new TestRollingLevelDBTimelineStore();
store.setup();
store.testStorePerformance();
store.tearDown();
}
}
}