Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.NotImplementedException;
Expand Down Expand Up @@ -140,14 +141,22 @@ default CacheValue<VALUE> getCacheValue(CacheKey<KEY> cacheKey) {

/**
* Removes all the entries from the table cache which are having epoch value
* less
* than or equal to specified epoch value.
* less than or equal to specified epoch value.
* @param epoch
*/
default void cleanupCache(long epoch) {
throw new NotImplementedException("cleanupCache is not implemented");
}

/**
* Removes all the entries from the table cache which are matching with
* epoch provided in the epoch list.
* @param epochs
*/
default void cleanupCache(List<Long> epochs) {
throw new NotImplementedException("cleanupCache is not implemented");
}

/**
* Return cache iterator maintained for this table.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -238,6 +239,11 @@ public void cleanupCache(long epoch) {
cache.cleanup(epoch);
}

@Override
public void cleanupCache(List<Long> epochs) {
cache.cleanup(epochs);
}

@VisibleForTesting
TableCache<CacheKey<KEY>, CacheValue<VALUE>> getCache() {
return cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -70,6 +71,17 @@ public interface TableCache<CACHEKEY extends CacheKey,
*/
void cleanup(long epoch);

/**
* Removes all the entries from the cache which are matching with epoch
* provided in the epoch list.
*
* If clean up policy is NEVER, this is a do nothing operation.
* If clean up policy is MANUAL, it is caller responsibility to cleanup the
* cache before calling cleanup.
* @param epochs
*/
void cleanup(List<Long> epochs);

/**
* Return the size of the cache.
* @return size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hadoop.hdds.utils.db.cache;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -93,7 +94,11 @@ public void put(CACHEKEY cacheKey, CACHEVALUE value) {

@Override
public void cleanup(long epoch) {
executorService.submit(() -> evictCache(epoch, cleanupPolicy));
executorService.submit(() -> evictCache(epoch));
}

public void cleanup(List<Long> epochs) {
executorService.submit(() -> evictCache(epochs));
}

@Override
Expand All @@ -106,7 +111,38 @@ public Iterator<Map.Entry<CACHEKEY, CACHEVALUE>> iterator() {
return cache.entrySet().iterator();
}

private void evictCache(long epoch, CacheCleanupPolicy cacheCleanupPolicy) {
private void evictCache(List<Long> epochs) {
EpochEntry<CACHEKEY> currentEntry;
long lastEpoch = epochs.get(epochs.size() - 1);
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
iterator.hasNext();) {
currentEntry = iterator.next();
CACHEKEY cachekey = currentEntry.getCachekey();
CacheValue cacheValue = cache.computeIfPresent(cachekey, ((k, v) -> {
if (cleanupPolicy == CacheCleanupPolicy.MANUAL) {
if (epochs.contains(v.getEpoch())) {
iterator.remove();
return null;
}
} else if (cleanupPolicy == CacheCleanupPolicy.NEVER) {
// Remove only entries which are marked for delete.
if (epochs.contains(v.getEpoch()) && v.getCacheValue() == null) {
iterator.remove();
return null;
}
}
return v;
}));

// If currentEntry epoch is greater than last epoch, we have deleted all
// entries less than specified epoch. So, we can break.
if (cacheValue != null && cacheValue.getEpoch() >= lastEpoch) {
break;
}
}
}

private void evictCache(long epoch) {
EpochEntry<CACHEKEY> currentEntry = null;
for (Iterator<EpochEntry<CACHEKEY>> iterator = epochEntries.iterator();
iterator.hasNext();) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ private OzoneConsts() {
public static final String MAX_PARTS = "maxParts";
public static final String S3_BUCKET = "s3Bucket";
public static final String S3_GETSECRET_USER = "S3GetSecretUser";
public static final String MULTIPART_UPLOAD_PART_NUMBER = "mpuPartNumber";
public static final String MULTIPART_UPLOAD_PART_NAME = "mpuPartName";



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hadoop.hdds.utils.db.cache;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -88,6 +89,55 @@ public void testPartialTableCache() {
}


@Test
public void testPartialTableCacheWithNotContinousEntries() throws Exception {
int totalCount = 0;
int insertedCount = 3000;

int cleanupCount = 0;

ArrayList<Long> epochs = new ArrayList();
for (long i=0; i<insertedCount; i+=2) {
if (cleanupCount++ < 1000) {
epochs.add(i);
}
tableCache.put(new CacheKey<>(Long.toString(i)),
new CacheValue<>(Optional.of(Long.toString(i)), i));
totalCount++;
}

Assert.assertEquals(totalCount, tableCache.size());

tableCache.cleanup(epochs);

final int count = totalCount;

// If cleanup policy is manual entries should have been removed.
if (cacheCleanupPolicy == TableCacheImpl.CacheCleanupPolicy.MANUAL) {
GenericTestUtils.waitFor(() -> {
return count - epochs.size() == tableCache.size();
}, 100, 10000);

// Check remaining entries exist or not and deleted entries does not
// exist.
for (long i = 0; i < insertedCount; i += 2) {
if (!epochs.contains(i)) {
Assert.assertEquals(Long.toString(i),
tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue());
} else {
Assert.assertEquals(null,
tableCache.get(new CacheKey<>(Long.toString(i))));
}
}
} else {
for (long i = 0; i < insertedCount; i += 2) {
Assert.assertEquals(Long.toString(i),
tableCache.get(new CacheKey<>(Long.toString(i))).getCacheValue());
}
}

}

@Test
public void testPartialTableCacheParallel() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.hadoop.ozone.om.ratis;

import java.io.IOException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
Expand Down Expand Up @@ -169,12 +171,20 @@ private void flushTransactions() {

long lastRatisTransactionIndex =
readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
.max(Long::compareTo).get();
.max(Long::compareTo).get();

readyBuffer.clear();
if (!isRatisEnabled) {
List<Long> flushedEpochs =
readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
.sorted().collect(Collectors.toList());

cleanupCache(flushedEpochs);
} else {
// cleanup cache.
cleanupCache(lastRatisTransactionIndex);
}

// cleanup cache.
cleanupCache(lastRatisTransactionIndex);
readyBuffer.clear();

// TODO: Need to revisit this logic, once we have multiple
// executors for volume/bucket request handling. As for now
Expand Down Expand Up @@ -236,6 +246,32 @@ private void cleanupCache(long lastRatisTransactionIndex) {

}

private void cleanupCache(List<Long> lastRatisTransactionIndex) {
// As now only volume and bucket transactions are handled only called
// cleanupCache on bucketTable.
// TODO: After supporting all write operations we need to call
// cleanupCache on the tables only when buffer has entries for that table.
omMetadataManager.getBucketTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getVolumeTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getUserTable().cleanupCache(lastRatisTransactionIndex);

//TODO: Optimization we can do here is for key transactions we can only
// cleanup cache when it is key commit transaction. In this way all
// intermediate transactions for a key will be read from in-memory cache.
omMetadataManager.getOpenKeyTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getKeyTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getDeletedTable().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getS3Table().cleanupCache(lastRatisTransactionIndex);
omMetadataManager.getMultipartInfoTable().cleanupCache(
lastRatisTransactionIndex);
omMetadataManager.getS3SecretTable().cleanupCache(
lastRatisTransactionIndex);
omMetadataManager.getDelegationTokenTable().cleanupCache(
lastRatisTransactionIndex);
omMetadataManager.getPrefixTable().cleanupCache(lastRatisTransactionIndex);

}

/**
* Update OzoneManagerDoubleBuffer metrics values.
* @param flushedTransactionsSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.om.request.s3.multipart;

import com.google.common.base.Optional;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
Expand All @@ -32,6 +33,8 @@
import org.apache.hadoop.ozone.om.response.s3.multipart
.S3MultipartUploadCommitPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
Expand All @@ -47,6 +50,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
Expand Down Expand Up @@ -84,8 +88,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
MultipartCommitUploadPartRequest multipartCommitUploadPartRequest =
getOmRequest().getCommitMultiPartUploadRequest();

OzoneManagerProtocolProtos.KeyArgs keyArgs =
multipartCommitUploadPartRequest.getKeyArgs();
KeyArgs keyArgs = multipartCommitUploadPartRequest.getKeyArgs();

String volumeName = keyArgs.getVolumeName();
String bucketName = keyArgs.getBucketName();
Expand Down Expand Up @@ -210,8 +213,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

// audit log
auditLog(ozoneManager.getAuditLogger(), buildAuditMessage(
OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs),
exception, getOmRequest().getUserInfo()));
OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY,
buildAuditMap(keyArgs, partName), exception,
getOmRequest().getUserInfo()));

if (exception == null) {
LOG.debug("MultipartUpload Commit is successfully for Key:{} in " +
Expand All @@ -224,5 +228,16 @@ OMAction.COMMIT_MULTIPART_UPLOAD_PARTKEY, buildKeyArgsAuditMap(keyArgs),
}
return omClientResponse;
}

private Map<String, String> buildAuditMap(KeyArgs keyArgs, String partName) {
Map<String, String> auditMap = buildKeyArgsAuditMap(keyArgs);

// Add MPU related information.
auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NUMBER,
String.valueOf(keyArgs.getMultipartNumber()));
auditMap.put(OzoneConsts.MULTIPART_UPLOAD_PART_NAME, partName);

return auditMap;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,

if (partKeyInfo == null ||
!partName.equals(partKeyInfo.getPartName())) {
String omPartName = partKeyInfo == null ? null :
partKeyInfo.getPartName();
throw new OMException("Complete Multipart Upload Failed: volume: " +
volumeName + "bucket: " + bucketName + "key: " + keyName,
volumeName + "bucket: " + bucketName + "key: " + keyName +
". Provided Part info is { " + partName + ", " + partNumber +
"}, where as OM has partName " + omPartName,
OMException.ResultCodes.INVALID_PART);
}

Expand Down