Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput {
private final DataStreamOutput out;
private CompletableFuture<DataStreamReply> dataStreamCloseReply;
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
private final long syncSize = 0; // TODO: disk sync is disabled for now
private static final long SYNC_SIZE = 0; // TODO: disk sync is disabled for now
private long syncPosition = 0;
private StreamBuffer currentBuffer;
private XceiverClientMetrics metrics;
Expand Down Expand Up @@ -630,9 +630,9 @@ public boolean isClosed() {
}

private boolean needSync(long position) {
if (syncSize > 0) {
if (SYNC_SIZE > 0) {
// TODO: or position >= fileLength
if (position - syncPosition >= syncSize) {
if (position - syncPosition >= SYNC_SIZE) {
syncPosition = position;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TestDatanodeStateMachine {
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
// Changing it to 1, as current code checks for multiple scm directories,
// and fail if exists
private final int scmServerCount = 1;
private static final int SCM_SERVER_COUNT = 1;
private List<String> serverAddresses;
private List<RPC.Server> scmServers;
private List<ScmTestMock> mockServers;
Expand All @@ -93,7 +93,7 @@ public void setUp() throws Exception {
serverAddresses = new ArrayList<>();
scmServers = new ArrayList<>();
mockServers = new ArrayList<>();
for (int x = 0; x < scmServerCount; x++) {
for (int x = 0; x < SCM_SERVER_COUNT; x++) {
int port = SCMTestUtils.getReuseableAddress().getPort();
String address = "127.0.0.1";
serverAddresses.add(address + ":" + port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public class TestRDBSnapshotProvider {
private Set<TableConfig> configSet;
private RDBSnapshotProvider rdbSnapshotProvider;
private File testDir;
private final int numUsedCF = 3;
private final String leaderId = "leaderNode-1";
private static final int NUM_USED_CF = 3;
private static final String LEADER_ID = "leaderNode-1";
private final AtomicReference<DBCheckpoint> latestCK =
new AtomicReference<>(null);

Expand All @@ -109,7 +109,7 @@ public void close() {
public void downloadSnapshot(String leaderNodeID, File targetFile)
throws IOException {
for (int i = 0; i < 10; i++) {
insertDataToDB(numUsedCF);
insertDataToDB(NUM_USED_CF);
}
DBCheckpoint dbCheckpoint = rdbStore.getCheckpoint(true);
latestCK.set(dbCheckpoint);
Expand Down Expand Up @@ -151,30 +151,30 @@ public void testDownloadDBSnapshotFromLeader() throws Exception {
assertEquals(0, before);

// Get first snapshot
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID);
File checkpointDir = checkpoint.getCheckpointLocation().toFile();
assertEquals(candidateDir, checkpointDir);
int first = HAUtils.getExistingSstFiles(
rdbSnapshotProvider.getCandidateDir()).size();

// Get second snapshot
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID);
int second = HAUtils.getExistingSstFiles(
rdbSnapshotProvider.getCandidateDir()).size();
assertThat(second).withFailMessage("The second snapshot should have more SST files")
.isGreaterThan(first);
DBCheckpoint latestCheckpoint = latestCK.get();
compareDB(latestCheckpoint.getCheckpointLocation().toFile(),
checkpoint.getCheckpointLocation().toFile(), numUsedCF);
checkpoint.getCheckpointLocation().toFile(), NUM_USED_CF);

// Get third snapshot
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(leaderId);
checkpoint = rdbSnapshotProvider.downloadDBSnapshotFromLeader(LEADER_ID);
int third = HAUtils.getExistingSstFiles(
rdbSnapshotProvider.getCandidateDir()).size();
assertThat(third).withFailMessage("The third snapshot should have more SST files")
.isGreaterThan(second);
compareDB(latestCK.get().getCheckpointLocation().toFile(),
checkpoint.getCheckpointLocation().toFile(), numUsedCF);
checkpoint.getCheckpointLocation().toFile(), NUM_USED_CF);

// Test cleanup candidateDB
rdbSnapshotProvider.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,

private ColumnFamilyHandle snapshotInfoTableCFHandle;
private final AtomicInteger tarballRequestCount;
private final String dagPruningServiceName = "CompactionDagPruningService";
private static final String DAG_PRUNING_SERVICE_NAME = "CompactionDagPruningService";
private AtomicBoolean suspended;

private ColumnFamilyHandle compactionLogTableCFHandle;
Expand Down Expand Up @@ -230,7 +230,7 @@ public class RocksDBCheckpointDiffer implements AutoCloseable,
TimeUnit.MILLISECONDS);

if (pruneCompactionDagDaemonRunIntervalInMs > 0) {
this.scheduler = new Scheduler(dagPruningServiceName,
this.scheduler = new Scheduler(DAG_PRUNING_SERVICE_NAME,
true, 1);

this.scheduler.scheduleWithFixedDelay(
Expand Down Expand Up @@ -307,7 +307,7 @@ public void close() throws Exception {
if (!closed) {
closed = true;
if (scheduler != null) {
LOG.info("Shutting down {}.", dagPruningServiceName);
LOG.info("Shutting down {}.", DAG_PRUNING_SERVICE_NAME);
scheduler.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ public class TestRocksDBCheckpointDiffer {

private final List<List<ColumnFamilyHandle>> colHandles = new ArrayList<>();

private final String activeDbDirName = "./rocksdb-data";
private final String metadataDirName = "./metadata";
private final String compactionLogDirName = "compaction-log";
private final String sstBackUpDirName = "compaction-sst-backup";
private static final String ACTIVE_DB_DIR_NAME = "./rocksdb-data";
private static final String METADATA_DIR_NAME = "./metadata";
private static final String COMPACTION_LOG_DIR_NAME = "compaction-log";
private static final String SST_BACK_UP_DIR_NAME = "compaction-sst-backup";
private File activeDbDir;
private File metadataDirDir;
private File compactionLogDir;
Expand All @@ -150,17 +150,17 @@ public void init() throws RocksDBException {
// Test class log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);

activeDbDir = new File(activeDbDirName);
createDir(activeDbDir, activeDbDirName);
activeDbDir = new File(ACTIVE_DB_DIR_NAME);
createDir(activeDbDir, ACTIVE_DB_DIR_NAME);

metadataDirDir = new File(metadataDirName);
createDir(metadataDirDir, metadataDirName);
metadataDirDir = new File(METADATA_DIR_NAME);
createDir(metadataDirDir, METADATA_DIR_NAME);

compactionLogDir = new File(metadataDirName, compactionLogDirName);
createDir(compactionLogDir, metadataDirName + "/" + compactionLogDirName);
compactionLogDir = new File(METADATA_DIR_NAME, COMPACTION_LOG_DIR_NAME);
createDir(compactionLogDir, METADATA_DIR_NAME + "/" + COMPACTION_LOG_DIR_NAME);

sstBackUpDir = new File(metadataDirName, sstBackUpDirName);
createDir(sstBackUpDir, metadataDirName + "/" + sstBackUpDirName);
sstBackUpDir = new File(METADATA_DIR_NAME, SST_BACK_UP_DIR_NAME);
createDir(sstBackUpDir, METADATA_DIR_NAME + "/" + SST_BACK_UP_DIR_NAME);

config = mock(ConfigurationSource.class);

Expand All @@ -174,10 +174,10 @@ public void init() throws RocksDBException {
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS)).thenReturn(0L);

rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(metadataDirName,
sstBackUpDirName,
compactionLogDirName,
activeDbDirName,
rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(METADATA_DIR_NAME,
SST_BACK_UP_DIR_NAME,
COMPACTION_LOG_DIR_NAME,
ACTIVE_DB_DIR_NAME,
config);

ColumnFamilyOptions cfOpts = new ColumnFamilyOptions()
Expand All @@ -189,7 +189,7 @@ public void init() throws RocksDBException {
.setCreateMissingColumnFamilies(true);

rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
activeRocksDB = RocksDB.open(dbOptions, activeDbDirName, cfDescriptors,
activeRocksDB = RocksDB.open(dbOptions, ACTIVE_DB_DIR_NAME, cfDescriptors,
cfHandles);
keyTableCFHandle = cfHandles.get(1);
directoryTableCFHandle = cfHandles.get(2);
Expand Down Expand Up @@ -518,7 +518,7 @@ public void testGetSSTDiffListWithoutDB(String description,
@Test
void testDifferWithDB() throws Exception {
writeKeysAndCheckpointing();
readRocksDBInstance(activeDbDirName, activeRocksDB, null,
readRocksDBInstance(ACTIVE_DB_DIR_NAME, activeRocksDB, null,
rocksDBCheckpointDiffer);

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -614,7 +614,7 @@ private void createCheckpoint(RocksDB rocksDB) throws RocksDBException {
}
cpDirList.add(dir);

createCheckPoint(activeDbDirName, cpPath, rocksDB);
createCheckPoint(ACTIVE_DB_DIR_NAME, cpPath, rocksDB);
final UUID snapshotId = UUID.randomUUID();
List<ColumnFamilyHandle> colHandle = new ArrayList<>();
colHandles.add(colHandle);
Expand Down Expand Up @@ -1273,7 +1273,7 @@ public void testPruneOlderSnapshotsWithCompactionHistory(

if (compactionLogs != null) {
for (int i = 0; i < compactionLogs.size(); i++) {
String compactionFileName = metadataDirName + "/" + compactionLogDirName
String compactionFileName = METADATA_DIR_NAME + "/" + COMPACTION_LOG_DIR_NAME
+ "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
File compactionFile = new File(compactionFileName);
Files.write(compactionFile.toPath(),
Expand Down Expand Up @@ -1491,8 +1491,8 @@ public void testSstFilePruning(

Path compactionLogFilePath = null;
if (compactionLog != null) {
String compactionLogFileName = metadataDirName + "/" +
compactionLogDirName + "/compaction_log" +
String compactionLogFileName = METADATA_DIR_NAME + "/" +
COMPACTION_LOG_DIR_NAME + "/compaction_log" +
COMPACTION_LOG_FILE_NAME_SUFFIX;
compactionLogFilePath = new File(compactionLogFileName).toPath();
createFileWithContext(compactionLogFileName, compactionLog);
Expand All @@ -1512,7 +1512,7 @@ public void testSstFilePruning(

Set<String> actualFileSetAfterPruning;
try (Stream<Path> pathStream = Files.list(
Paths.get(metadataDirName + "/" + sstBackUpDirName))
Paths.get(METADATA_DIR_NAME + "/" + SST_BACK_UP_DIR_NAME))
.filter(e -> e.toString().toLowerCase()
.endsWith(SST_FILE_EXTENSION))
.sorted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public int getNodeCount(NodeStatus nodeStatus) {
* Returns the Number of Datanodes by State they are in. Passing null for
* either of the states acts like a wildcard for that state.
*
* @parem nodeOpState - The Operational State of the node
* @param nodeOpState - The Operational State of the node
* @param health - The health of the node
* @return count
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public class TestS3GrpcOmTransport {
private static final Logger LOG =
LoggerFactory.getLogger(TestS3GrpcOmTransport.class);

private final String leaderOMNodeId = "TestOM";
private static final String LEADER_OM_NODE_ID = "TestOM";

private final OMResponse omResponse = OMResponse.newBuilder()
.setSuccess(true)
.setStatus(org.apache.hadoop.ozone.protocol
.proto.OzoneManagerProtocolProtos.Status.OK)
.setLeaderOMNodeId(leaderOMNodeId)
.setLeaderOMNodeId(LEADER_OM_NODE_ID)
.setCmdType(Type.AllocateBlock)
.build();

Expand Down Expand Up @@ -167,7 +167,7 @@ public void testSubmitRequestToServer() throws Exception {
final OMResponse resp = client.submitRequest(omRequest);
assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
.proto.OzoneManagerProtocolProtos.Status.OK);
assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
assertEquals(resp.getLeaderOMNodeId(), LEADER_OM_NODE_ID);
}

@Test
Expand All @@ -191,7 +191,7 @@ public void testGrpcFailoverProxy() throws Exception {
final OMResponse resp = client.submitRequest(omRequest);
assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
.proto.OzoneManagerProtocolProtos.Status.OK);
assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
assertEquals(resp.getLeaderOMNodeId(), LEADER_OM_NODE_ID);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class MiniOzoneChaosCluster extends MiniOzoneHAClusterImpl {

private final FailureManager failureManager;

private final int waitForClusterToBeReadyTimeout = 120000; // 2 min
private static final int WAIT_FOR_CLUSTER_TO_BE_READY_TIMEOUT = 120000; // 2 min

private final Set<OzoneManager> failedOmSet;
private final Set<StorageContainerManager> failedScmSet;
Expand Down Expand Up @@ -158,7 +158,7 @@ public void waitForClusterToBeReady()
}
}
return true;
}, 1000, waitForClusterToBeReadyTimeout);
}, 1000, WAIT_FOR_CLUSTER_TO_BE_READY_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ public class TestOzoneFsHAURLs {
private String bucketName;
private String rootPath;

private final String o3fsImplKey =
private static final String O3FS_IMPL_KEY =
"fs." + OzoneConsts.OZONE_URI_SCHEME + ".impl";
private final String o3fsImplValue =
private static final String O3FS_IMPL_VALUE =
"org.apache.hadoop.fs.ozone.OzoneFileSystem";
private static OzoneClient client;

private final String ofsImplKey =
private static final String OFS_IMPL_KEY =
"fs." + OzoneConsts.OZONE_OFS_URI_SCHEME + ".impl";

private final String ofsImplValue =
private static final String OFS_IMPL_VALUE =
"org.apache.hadoop.fs.ozone.RootedOzoneFileSystem";


Expand Down Expand Up @@ -219,7 +219,7 @@ private int getPortFromAddress(String addr) {
public void testWithQualifiedDefaultFS() throws Exception {
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
clientConf.setQuietMode(false);
clientConf.set(o3fsImplKey, o3fsImplValue);
clientConf.set(O3FS_IMPL_KEY, O3FS_IMPL_VALUE);
// fs.defaultFS = o3fs://bucketName.volumeName.omServiceId/
clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

Expand Down Expand Up @@ -314,7 +314,7 @@ public void testWithQualifiedDefaultFS() throws Exception {
private void testWithDefaultFS(String defaultFS) throws Exception {
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
clientConf.setQuietMode(false);
clientConf.set(o3fsImplKey, o3fsImplValue);
clientConf.set(O3FS_IMPL_KEY, O3FS_IMPL_VALUE);
// fs.defaultFS = file:///
clientConf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultFS);
Expand Down Expand Up @@ -359,8 +359,8 @@ public void testOtherDefaultFS() throws Exception {
public void testIncorrectAuthorityInURI() throws Exception {
OzoneConfiguration clientConf = new OzoneConfiguration(conf);
clientConf.setQuietMode(false);
clientConf.set(o3fsImplKey, o3fsImplValue);
clientConf.set(ofsImplKey, ofsImplValue);
clientConf.set(O3FS_IMPL_KEY, O3FS_IMPL_VALUE);
clientConf.set(OFS_IMPL_KEY, OFS_IMPL_VALUE);
FsShell shell = new FsShell(clientConf);
String incorrectSvcId = "dummy";
String o3fsPathWithCorrectSvcId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class TestHDDSUpgrade {
private StorageContainerManager scm;
private ContainerManager scmContainerManager;
private PipelineManager scmPipelineManager;
private final int numContainersCreated = 1;
private static final int NUM_CONTAINERS_CREATED = 1;
private HDDSLayoutVersionManager scmVersionManager;
private AtomicBoolean testPassed = new AtomicBoolean(true);
private static
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testFinalizationFromInitialVersionToLatestVersion()
// Verify Post-Upgrade conditions on the SCM.
TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
cluster.getStorageContainerManagersList(),
numContainersCreated, NUM_DATA_NODES);
NUM_CONTAINERS_CREATED, NUM_DATA_NODES);

// All datanodes on the SCM should have moved to HEALTHY-READONLY state.
TestHddsUpgradeUtils.testDataNodesStateOnSCM(
Expand All @@ -327,7 +327,7 @@ public void testFinalizationFromInitialVersionToLatestVersion()
// In the happy path case, no containers should have been quasi closed as
// a result of the upgrade.
TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
cluster.getHddsDatanodes(), numContainersCreated, CLOSED);
cluster.getHddsDatanodes(), NUM_CONTAINERS_CREATED, CLOSED);

// Test that we can use a pipeline after upgrade.
// Will fail with exception if there are no pipelines.
Expand Down Expand Up @@ -871,7 +871,7 @@ public void testFinalizationWithFailureInjectionHelper(
// Verify Post-Upgrade conditions on the SCM.
// With failure injection
TestHddsUpgradeUtils.testPostUpgradeConditionsSCM(
cluster.getStorageContainerManagersList(), numContainersCreated,
cluster.getStorageContainerManagersList(), NUM_CONTAINERS_CREATED,
NUM_DATA_NODES);

// All datanodes on the SCM should have moved to HEALTHY-READONLY state.
Expand All @@ -898,7 +898,7 @@ public void testFinalizationWithFailureInjectionHelper(

// Verify the SCM has driven all the DataNodes through Layout Upgrade.
TestHddsUpgradeUtils.testPostUpgradeConditionsDataNodes(
cluster.getHddsDatanodes(), numContainersCreated);
cluster.getHddsDatanodes(), NUM_CONTAINERS_CREATED);

// Verify that new pipeline can be created with upgraded datanodes.
try {
Expand Down
Loading