Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-support/pmd/pmd-ruleset.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<rule ref="category/java/bestpractices.xml/ForLoopCanBeForeach"/>
<rule ref="category/java/bestpractices.xml/MissingOverride"/>
<rule ref="category/java/bestpractices.xml/UnusedPrivateMethod"/>
<rule ref="category/java/bestpractices.xml/UnusedPrivateField"/>

<exclude-pattern>.*/generated-sources/.*</exclude-pattern>
</ruleset>
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
// never reach, just to make compiler happy.
return null;
}
return flushFuture.thenApply(r -> new PutBlockResult(flushPos, asyncReply.getLogIndex(), r));
return flushFuture.thenApply(r -> new PutBlockResult(asyncReply.getLogIndex(), r));
}

@Override
Expand Down Expand Up @@ -997,7 +997,7 @@ private CompletableFuture<PutBlockResult> writeChunkToContainer(
// never reach.
return null;
}
return validateFuture.thenApply(x -> new PutBlockResult(flushPos, asyncReply.getLogIndex(), x));
return validateFuture.thenApply(x -> new PutBlockResult(asyncReply.getLogIndex(), x));
}

private void handleSuccessfulPutBlock(
Expand Down Expand Up @@ -1227,12 +1227,10 @@ public int getReplicationIndex() {
}

static class PutBlockResult {
private final long flushPosition;
private final long commitIndex;
private final ContainerCommandResponseProto response;

PutBlockResult(long flushPosition, long commitIndex, ContainerCommandResponseProto response) {
this.flushPosition = flushPosition;
PutBlockResult(long commitIndex, ContainerCommandResponseProto response) {
this.commitIndex = commitIndex;
this.response = response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public CompletableFuture<PutBlockResult> executePutBlock(boolean close,
return null;
}
this.putBlkRspFuture = flushFuture;
return flushFuture.thenApply(r -> new PutBlockResult(0, 0, r));
return flushFuture.thenApply(r -> new PutBlockResult(0, r));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private final Map<ContainerType, Handler> handlers;
private final ConfigurationSource conf;
private final ContainerSet containerSet;
private final VolumeSet volumeSet;
private final StateContext context;
private final float containerCloseThreshold;
private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
Expand All @@ -125,7 +124,6 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
TokenVerifier tokenVerifier) {
this.conf = config;
this.containerSet = contSet;
this.volumeSet = volumes;
this.context = context;
this.handlers = handlers;
this.metrics = metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,12 @@ public class HeartbeatEndpointTask
public static final Logger LOG =
LoggerFactory.getLogger(HeartbeatEndpointTask.class);
private final EndpointStateMachine rpcEndpoint;
private final ConfigurationSource conf;
private DatanodeDetailsProto datanodeDetailsProto;
private StateContext context;
private int maxContainerActionsPerHB;
private int maxPipelineActionsPerHB;
private HDDSLayoutVersionManager layoutVersionManager;

/**
* Constructs a SCM heart beat.
*
* @param rpcEndpoint rpc Endpoint
* @param conf Config.
* @param context State context
*/
public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
ConfigurationSource conf, StateContext context) {
this(rpcEndpoint, conf, context,
context.getParent().getLayoutVersionManager());
}

/**
* Constructs a SCM heart beat.
*
Expand All @@ -103,7 +89,6 @@ public HeartbeatEndpointTask(EndpointStateMachine rpcEndpoint,
ConfigurationSource conf, StateContext context,
HDDSLayoutVersionManager versionManager) {
this.rpcEndpoint = rpcEndpoint;
this.conf = conf;
this.context = context;
this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -48,8 +47,6 @@ public final class RegisterEndpointTask implements
static final Logger LOG = LoggerFactory.getLogger(RegisterEndpointTask.class);

private final EndpointStateMachine rpcEndPoint;
private final ConfigurationSource conf;
private Future<EndpointStateMachine.EndPointStates> result;
private DatanodeDetails datanodeDetails;
private final OzoneContainer datanodeContainerManager;
private StateContext stateContext;
Expand All @@ -59,34 +56,15 @@ public final class RegisterEndpointTask implements
* Creates a register endpoint task.
*
* @param rpcEndPoint - endpoint
* @param conf - conf
* @param ozoneContainer - container
* @param context - State context
*/
@VisibleForTesting
public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
ConfigurationSource conf,
OzoneContainer ozoneContainer,
StateContext context) {
this(rpcEndPoint, conf, ozoneContainer, context,
context.getParent().getLayoutVersionManager());
}

/**
* Creates a register endpoint task.
*
* @param rpcEndPoint - endpoint
* @param conf - conf
* @param ozoneContainer - container
* @param context - State context
* @param versionManager - layout version Manager
*/
@VisibleForTesting
public RegisterEndpointTask(EndpointStateMachine rpcEndPoint,
ConfigurationSource conf, OzoneContainer ozoneContainer,
OzoneContainer ozoneContainer,
StateContext context, HDDSLayoutVersionManager versionManager) {
this.rpcEndPoint = rpcEndPoint;
this.conf = conf;
this.datanodeContainerManager = ozoneContainer;
this.stateContext = context;
if (versionManager != null) {
Expand Down Expand Up @@ -305,7 +283,7 @@ public RegisterEndpointTask build() {
}

RegisterEndpointTask task = new RegisterEndpointTask(this
.endPointStateMachine, this.conf, this.container, this.context,
.endPointStateMachine, this.container, this.context,
this.versionManager);
task.setDatanodeDetails(datanodeDetails);
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
public final class XceiverServerGrpc implements XceiverServerSpi {
private static final Logger
LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
private static final String COMPONENT = "dn";
private int port;
private UUID id;
private Server server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
*/
public final class ContainerCacheMetrics {

private final String name;
private final MetricsSystem ms;

@Metric("Rate to measure the db open latency")
private MutableRate dbOpenLatency;

Expand All @@ -52,16 +49,14 @@ public final class ContainerCacheMetrics {
@Metric("Number of Container Cache Evictions")
private MutableCounterLong numCacheEvictions;

private ContainerCacheMetrics(String name, MetricsSystem ms) {
this.name = name;
this.ms = ms;
private ContainerCacheMetrics() {
}

public static ContainerCacheMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
String name = "ContainerCacheMetrics";

return ms.register(name, "null", new ContainerCacheMetrics(name, ms));
return ms.register(name, "null", new ContainerCacheMetrics());
}

public void incNumDbGetOps() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class MutableVolumeSet implements VolumeSet {
private final ReentrantReadWriteLock volumeSetRWLock;

private final String datanodeUuid;
private String clusterID;

private final StorageVolumeChecker volumeChecker;
private CheckedRunnable<IOException> failedVolumeListener;
Expand All @@ -100,7 +99,6 @@ public MutableVolumeSet(String dnUuid, String clusterID,
) throws IOException {
this.context = context;
this.datanodeUuid = dnUuid;
this.clusterID = clusterID;
this.conf = conf;
this.volumeSetRWLock = new ReentrantReadWriteLock();
this.volumeChecker = volumeChecker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,12 @@ private static final class LastCheckResult<V> {
@Nullable
private final V result;

/**
* Exception thrown by the check. null if it returned a result.
*/
private final Throwable exception; // null on success.

/**
* Initialize with a result.
* @param result
*/
private LastCheckResult(V result, long completedAt) {
this.result = result;
this.exception = null;
this.completedAt = completedAt;
}

Expand All @@ -237,7 +231,6 @@ private LastCheckResult(V result, long completedAt) {
*/
private LastCheckResult(Throwable t, long completedAt) {
this.result = null;
this.exception = t;
this.completedAt = completedAt;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
Expand All @@ -55,12 +54,11 @@ public class ChunkManagerDispatcher implements ChunkManager {
private final Map<ContainerLayoutVersion, ChunkManager> handlers
= new EnumMap<>(ContainerLayoutVersion.class);

ChunkManagerDispatcher(boolean sync, BlockManager manager,
VolumeSet volSet) {
ChunkManagerDispatcher(boolean sync, BlockManager manager) {
handlers.put(FILE_PER_CHUNK,
new FilePerChunkStrategy(sync, manager, volSet));
new FilePerChunkStrategy(sync, manager));
handlers.put(FILE_PER_BLOCK,
new FilePerBlockStrategy(sync, manager, volSet));
new FilePerBlockStrategy(sync, manager));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public static ChunkManager createChunkManager(ConfigurationSource conf,
return new ChunkManagerDummyImpl();
}

return new ChunkManagerDispatcher(sync, manager, volSet);
return new ChunkManagerDispatcher(sync, manager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
Expand All @@ -76,10 +75,8 @@ public class FilePerBlockStrategy implements ChunkManager {
private final MappedBufferManager mappedBufferManager;

private final boolean readNettyChunkedNioFile;
private final VolumeSet volumeSet;

public FilePerBlockStrategy(boolean sync, BlockManager manager,
VolumeSet volSet) {
public FilePerBlockStrategy(boolean sync, BlockManager manager) {
doSyncWrite = sync;
this.defaultReadBufferCapacity = manager == null ? 0 :
manager.getDefaultReadBufferCapacity();
Expand All @@ -88,7 +85,6 @@ public FilePerBlockStrategy(boolean sync, BlockManager manager,
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
Expand All @@ -67,10 +66,8 @@ public class FilePerChunkStrategy implements ChunkManager {
private final MappedBufferManager mappedBufferManager;

private final boolean readNettyChunkedNioFile;
private final VolumeSet volumeSet;

public FilePerChunkStrategy(boolean sync, BlockManager manager,
VolumeSet volSet) {
public FilePerChunkStrategy(boolean sync, BlockManager manager) {
doSyncWrite = sync;
blockManager = manager;
this.defaultReadBufferCapacity = manager == null ? 0 :
Expand All @@ -80,7 +77,6 @@ public FilePerChunkStrategy(boolean sync, BlockManager manager,
this.readMappedBufferMaxCount = manager == null ? 0
: manager.getReadMappedBufferMaxCount();
LOG.info("ozone.chunk.read.mapped.buffer.max.count is load with {}", readMappedBufferMaxCount);
this.volumeSet = volSet;
if (this.readMappedBufferMaxCount > 0) {
mappedBufferManager = new MappedBufferManager(this.readMappedBufferMaxCount);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ public class MappedBufferManager {
new ConcurrentHashMap<String, WeakReference<ByteBuffer>>();
private static final Logger LOG = LoggerFactory.getLogger(MappedBufferManager.class);
private final Semaphore semaphore;
private final int capacity;
private final AtomicBoolean cleanupInProgress = new AtomicBoolean(false);
private final Striped<Lock> lock;

public MappedBufferManager(int capacity) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
this.lock = Striped.lazyWeakLock(1024);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ private KeyValueContainerData createToDeleteBlocks(ContainerSet containerSet,
int numOfBlocksPerContainer, int numOfChunksPerBlock) throws IOException {
ChunkManager chunkManager;
if (layout == FILE_PER_BLOCK) {
chunkManager = new FilePerBlockStrategy(true, null, null);
chunkManager = new FilePerBlockStrategy(true, null);
} else {
chunkManager = new FilePerChunkStrategy(true, null, null);
chunkManager = new FilePerChunkStrategy(true, null);
}
byte[] arr = randomAlphanumeric(1048576).getBytes(UTF_8);
ChunkBuffer buffer = ChunkBuffer.wrap(ByteBuffer.wrap(arr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void setup() throws Exception {
StorageVolume.VolumeType.DATA_VOLUME, null);

blockManager = new BlockManagerImpl(conf);
chunkManager = new FilePerBlockStrategy(true, blockManager, volumeSet);
chunkManager = new FilePerBlockStrategy(true, blockManager);

containerSet = new ContainerSet(1000);
keyValueHandler = new KeyValueHandler(conf, datanodeUuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void updateConfig(OzoneConfiguration config) {
FILE_PER_CHUNK {
@Override
public ChunkManager createChunkManager(boolean sync, BlockManager manager) {
return new FilePerChunkStrategy(sync, manager, null);
return new FilePerChunkStrategy(sync, manager);
}

@Override
Expand All @@ -85,7 +85,7 @@ public ContainerLayoutVersion getLayout() {
FILE_PER_BLOCK {
@Override
public ChunkManager createChunkManager(boolean sync, BlockManager manager) {
return new FilePerBlockStrategy(sync, null, null);
return new FilePerBlockStrategy(sync, null);
}

@Override
Expand Down
Loading