Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,28 +253,30 @@ private void deleteKeyValueContainerBlocks(
}
}

// Finally commit the DB counters.
BatchOperation batchOperation = new BatchOperation();
if (newDeletionBlocks > 0) {
// Finally commit the DB counters.
BatchOperation batchOperation = new BatchOperation();

// In memory is updated only when existing delete transactionID is
// greater.
if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
// Update in DB pending delete key count and delete transaction ID.
batchOperation.put(DB_CONTAINER_DELETE_TRANSACTION_KEY,
Longs.toByteArray(delTX.getTxID()));
}
// In memory is updated only when existing delete transactionID is
// greater.
if (delTX.getTxID() > containerData.getDeleteTransactionId()) {
// Update in DB pending delete key count and delete transaction ID.
batchOperation.put(DB_CONTAINER_DELETE_TRANSACTION_KEY,
Longs.toByteArray(delTX.getTxID()));
}

batchOperation.put(DB_PENDING_DELETE_BLOCK_COUNT_KEY, Longs.toByteArray(
containerData.getNumPendingDeletionBlocks() + newDeletionBlocks));
batchOperation.put(DB_PENDING_DELETE_BLOCK_COUNT_KEY, Longs.toByteArray(
containerData.getNumPendingDeletionBlocks() + newDeletionBlocks));

containerDB.getStore().writeBatch(batchOperation);
containerDB.getStore().writeBatch(batchOperation);


// update pending deletion blocks count and delete transaction ID in
// in-memory container status
containerData.updateDeleteTransactionId(delTX.getTxID());
// update pending deletion blocks count and delete transaction ID in
// in-memory container status
containerData.updateDeleteTransactionId(delTX.getTxID());

containerData.incrPendingDeletionBlocks(newDeletionBlocks);
containerData.incrPendingDeletionBlocks(newDeletionBlocks);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,16 @@ ContainerCommandResponseProto handlePutBlock(
BlockData blockData = BlockData.getFromProtoBuf(data);
Preconditions.checkNotNull(blockData);

boolean incrKeyCount = false;
if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) {
chunkManager.finishWriteChunks(kvContainer, blockData);
incrKeyCount = true;
}

long bcsId =
dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
blockData.setBlockCommitSequenceId(bcsId);
blockManager.putBlock(kvContainer, blockData);
blockManager.putBlock(kvContainer, blockData, incrKeyCount);

blockDataProto = blockData.getProtoBufMessage();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public BlockManagerImpl(ConfigurationSource conf) {
* @throws IOException
*/
public long putBlock(Container container, BlockData data) throws IOException {
return putBlock(container, data, true);
}
/**
* Puts or overwrites a block.
*
* @param container - Container for which block need to be added.
* @param data - BlockData.
* @param incrKeyCount - for FilePerBlockStrategy, increase key count only
* when the whole block file is written.
* @return length of the block.
* @throws IOException
*/
public long putBlock(Container container, BlockData data,
boolean incrKeyCount) throws IOException {
Preconditions.checkNotNull(data, "BlockData cannot be null for put " +
"operation.");
Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
Expand Down Expand Up @@ -129,14 +143,18 @@ public long putBlock(Container container, BlockData data) throws IOException {
Longs.toByteArray(container.getContainerData().getBytesUsed()));

// Set Block Count for a container.
batch.put(DB_BLOCK_COUNT_KEY,
Longs.toByteArray(container.getContainerData().getKeyCount() + 1));
if (incrKeyCount) {
batch.put(DB_BLOCK_COUNT_KEY,
Longs.toByteArray(container.getContainerData().getKeyCount() + 1));
}

db.getStore().writeBatch(batch);

container.updateBlockCommitSequenceId(bcsId);
// Increment block count finally here for in-memory.
container.getContainerData().incrKeyCount();
if (incrKeyCount) {
container.getContainerData().incrKeyCount();
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"Block " + data.getBlockID() + " successfully committed with bcsId "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ public interface BlockManager {
*/
long putBlock(Container container, BlockData data) throws IOException;

/**
* Puts or overwrites a block.
*
* @param container - Container for which block need to be added.
* @param data - Block Data.
* @param incrKeyCount - Whether to increase container key count.
* @return length of the Block.
* @throws IOException
*/
long putBlock(Container container, BlockData data, boolean incrKeyCount)
throws IOException;

/**
* Gets an existing block.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ public void commitTransactions(
final ContainerID containerId = ContainerID.valueof(
transactionResult.getContainerID());
if (dnsWithCommittedTxn == null) {
LOG.warn("Transaction txId={} commit by dnId={} for containerID={} "
+ "failed. Corresponding entry not found.", txID, dnID,
// Mostly likely it's a retried delete command response.
LOG.debug("Transaction txId={} commit by dnId={} for containerID={}"
+ " failed. Corresponding entry not found.", txID, dnID,
containerId);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public void handlePendingDeletes(PendingDeleteStatusList deletionStatusList) {
DatanodeDetails dnDetails = deletionStatusList.getDatanodeDetails();
for (PendingDeleteStatusList.PendingDeleteStatus deletionStatus :
deletionStatusList.getPendingDeleteStatuses()) {
LOG.info(
"Block deletion txnID mismatch in datanode {} for containerID {}."
LOG.debug(
"Block deletion txnID behinds in datanode {} for containerID {}."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Block deletion txnID behinds in datanode {} for containerID {}."
"Block deletion txnID lagging in datanode {} for containerID {}."

+ " Datanode delete txnID: {}, SCM txnID: {}",
dnDetails.getUuid(), deletionStatus.getContainerId(),
deletionStatus.getDnDeleteTransactionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private void updateContainerStats(final ContainerID containerId,
containerInfo.updateSequenceId(
replicaProto.getBlockCommitSequenceId());
}

if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
containerInfo.setUsedBytes(replicaProto.getUsed());
}
Expand Down Expand Up @@ -228,6 +229,8 @@ private void updateContainerReplica(final DatanodeDetails datanodeDetails,
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
.setSequenceId(replicaProto.getBlockCommitSequenceId())
.setKeyCount(replicaProto.getKeyCount())
.setBytesUsed(replicaProto.getUsed())
.build();

if (replica.getState().equals(State.DELETED)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@ public final class ContainerReplica implements Comparable<ContainerReplica> {
final private UUID placeOfBirth;

private Long sequenceId;
final private long keyCount;
final private long bytesUsed;


private ContainerReplica(final ContainerID containerID,
final ContainerReplicaProto.State state, final DatanodeDetails datanode,
final UUID originNodeId) {
final UUID originNodeId, long keyNum, long dataSize) {
this.containerID = containerID;
this.state = state;
this.datanodeDetails = datanode;
this.placeOfBirth = originNodeId;
this.keyCount = keyNum;
this.bytesUsed = dataSize;
}

private void setSequenceId(Long seqId) {
Expand Down Expand Up @@ -90,6 +94,24 @@ public Long getSequenceId() {
return sequenceId;
}

/**
* Returns the key count of of this replica.
*
* @return Key count
*/
public long getKeyCount() {
return keyCount;
}

/**
* Returns the data size of this replica.
*
* @return Data size
*/
public long getBytesUsed() {
return bytesUsed;
}

@Override
public int hashCode() {
return new HashCodeBuilder(61, 71)
Expand Down Expand Up @@ -141,6 +163,8 @@ public String toString() {
", datanodeDetails=" + datanodeDetails +
", placeOfBirth=" + placeOfBirth +
", sequenceId=" + sequenceId +
", keyCount=" + keyCount +
", bytesUsed=" + bytesUsed +
'}';
}

Expand All @@ -154,6 +178,8 @@ public static class ContainerReplicaBuilder {
private DatanodeDetails datanode;
private UUID placeOfBirth;
private Long sequenceId;
private long bytesUsed;
private long keyCount;

/**
* Set Container Id.
Expand Down Expand Up @@ -207,6 +233,16 @@ public ContainerReplicaBuilder setSequenceId(long seqId) {
return this;
}

public ContainerReplicaBuilder setKeyCount(long count) {
keyCount = count;
return this;
}

public ContainerReplicaBuilder setBytesUsed(long used) {
bytesUsed = used;
return this;
}

/**
* Constructs new ContainerReplicaBuilder.
*
Expand All @@ -221,11 +257,10 @@ public ContainerReplica build() {
"DatanodeDetails can't be null");
ContainerReplica replica = new ContainerReplica(
containerID, state, datanode,
Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()));
Optional.ofNullable(placeOfBirth).orElse(datanode.getUuid()),
keyCount, bytesUsed);
Optional.ofNullable(sequenceId).ifPresent(replica::setSequenceId);
return replica;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,26 @@ private void processContainer(ContainerID id) {
action -> replicas.stream()
.noneMatch(r -> r.getDatanodeDetails().equals(action.datanode)));

if (state == LifeCycleState.CLOSED) {
// check container key count and bytes used
long maxUsedBytes = 0;
long maxKeyCount = 0;
ContainerReplica[] rps = replicas.toArray(new ContainerReplica[0]);
for (int i = 0; i < rps.length; i++) {
maxUsedBytes = Math.max(maxUsedBytes, rps[i].getBytesUsed());
maxKeyCount = Math.max(maxKeyCount, rps[i].getKeyCount());
LOG.info("Replica key count {}, bytes used {}",
rps[i].getKeyCount(), rps[i].getBytesUsed());
}
if (maxKeyCount < container.getNumberOfKeys()) {
container.setNumberOfKeys(maxKeyCount);
}
if (maxUsedBytes < container.getUsedBytes()) {
container.setUsedBytes(maxUsedBytes);
}
LOG.info("Container key count {}, bytes used {}",
container.getNumberOfKeys(), container.getUsedBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please extract this block to a new method to keep the level of abstraction consistent.

}

/*
* We don't have to take any action if the container is healthy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
Expand Down Expand Up @@ -75,6 +77,22 @@ public static void closeContainers(
}, omKeyLocationInfoGroups);
}

/**
* Close all containers.
*
* @param eventPublisher event publisher.
* @param scm StorageContainerManager instance.
* @return true if close containers is successful.
* @throws IOException
*/
public static void closeAllContainers(EventPublisher eventPublisher,
StorageContainerManager scm) {
for (ContainerID containerID :
scm.getContainerManager().getContainerIDs()) {
eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
}
}

/**
* Performs the provided consumer on containers which contain the blocks
* listed in omKeyLocationInfoGroups.
Expand All @@ -85,7 +103,7 @@ public static void closeContainers(
*/
public static void performOperationOnKeyContainers(
CheckedConsumer<BlockID, Exception> consumer,
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups) throws Exception {
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups) throws Exception{

for (OmKeyLocationInfoGroup omKeyLocationInfoGroup :
omKeyLocationInfoGroups) {
Expand Down
Loading