From cea0c007696ecf885116a596573a46195782b1d6 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Fri, 22 Nov 2019 13:59:24 -0800 Subject: [PATCH 1/2] HDDS-2620. Fix listMultipartupload API. --- .../hadoop/ozone/om/OMMetadataManager.java | 3 +- .../hadoop/ozone/om/KeyManagerImpl.java | 12 +- .../ozone/om/OmMetadataManagerImpl.java | 32 ++++- .../hadoop/ozone/om/TestKeyManagerUnit.java | 122 ++++++++++++++++++ 4 files changed, 157 insertions(+), 12 deletions(-) diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index 3a661f0290d3..9a9cf8070f3d 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.common.BlockGroup; @@ -352,6 +353,6 @@ long countEstimatedRowsInTable(Table table) * Return the existing upload keys which includes volumeName, bucketName, * keyName. */ - List getMultipartUploadKeys(String volumeName, + Set getMultipartUploadKeys(String volumeName, String bucketName, String prefix) throws IOException; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index e89dc64b2c33..d92f5ac18235 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -1244,17 +1245,13 @@ public OmMultipartUploadList listMultipartUploads(String volumeName, bucketName); try { - List multipartUploadKeys = + Set multipartUploadKeys = metadataManager .getMultipartUploadKeys(volumeName, bucketName, prefix); List collect = multipartUploadKeys.stream() .map(OmMultipartUpload::from) .peek(upload -> { - String dbKey = metadataManager - .getOzoneKey(upload.getVolumeName(), - upload.getBucketName(), - upload.getKeyName()); try { Table keyInfoTable = metadataManager.getMultipartInfoTable(); @@ -1271,7 +1268,10 @@ public OmMultipartUploadList listMultipartUploads(String volumeName, } catch (IOException e) { LOG.warn( "Open key entry for multipart upload record can be read {}", - dbKey); + metadataManager + .getOzoneKey(upload.getVolumeName(), + upload.getBucketName(), + upload.getKeyName())); } }) .collect(Collectors.toList()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index 97b5d8cffd37..6d02a3df1056 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -930,21 +930,43 @@ public long countEstimatedRowsInTable(Table table) } @Override - public List getMultipartUploadKeys( + public Set getMultipartUploadKeys( String volumeName, String bucketName, String prefix) throws IOException { - List response = new ArrayList<>(); - TableIterator> - iterator = getMultipartInfoTable().iterator(); + Set response = new TreeSet<>(); + Set aborted = new TreeSet<>(); + + Iterator, CacheValue>> + cacheIterator = getMultipartInfoTable().cacheIterator(); String prefixKey = OmMultipartUpload.getDbKey(volumeName, bucketName, prefix); + + // First iterate all the entries in cache. + while (cacheIterator.hasNext()) { + Map.Entry, CacheValue> cacheEntry = + cacheIterator.next(); + if (cacheEntry.getKey().getCacheKey().startsWith(prefixKey)) { + // Check if it is marked for delete, due to abort mpu + if (cacheEntry.getValue().getCacheValue() != null) { + response.add(cacheEntry.getKey().getCacheKey()); + } else { + aborted.add(cacheEntry.getKey().getCacheKey()); + } + } + } + + TableIterator> + iterator = getMultipartInfoTable().iterator(); iterator.seek(prefixKey); while (iterator.hasNext()) { KeyValue entry = iterator.next(); if (entry.getKey().startsWith(prefixKey)) { - response.add(entry.getKey()); + // If it is marked for abort, skip it. + if (!aborted.contains(entry.getKey())) { + response.add(entry.getKey()); + } } else { break; } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java index b00bf44d7eab..52de53c05a14 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerUnit.java @@ -22,25 +22,35 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.UUID; +import com.google.common.base.Optional; +import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs.Builder; import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; import org.apache.hadoop.ozone.om.request.TestOMRequestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -124,6 +134,92 @@ public void listMultipartUploads() throws IOException { uploads.get(1).getCreationTime().compareTo(startDate) > 0); } + @Test + public void listMultipartUploadsWithFewEntriesInCache() throws IOException { + String volume = UUID.randomUUID().toString(); + String bucket = UUID.randomUUID().toString(); + + //GIVEN + createBucket(metadataManager, volume, bucket); + createBucket(metadataManager, volume, bucket); + + + // Add few to cache and few to DB. + addinitMultipartUploadToCache(volume, bucket, "dir/key1"); + + initMultipartUpload(keyManager, volume, bucket, "dir/key2"); + + addinitMultipartUploadToCache(volume, bucket, "dir/key3"); + + initMultipartUpload(keyManager, volume, bucket, "dir/key4"); + + //WHEN + OmMultipartUploadList omMultipartUploadList = + keyManager.listMultipartUploads(volume, bucket, ""); + + //THEN + List uploads = omMultipartUploadList.getUploads(); + Assert.assertEquals(4, uploads.size()); + Assert.assertEquals("dir/key1", uploads.get(0).getKeyName()); + Assert.assertEquals("dir/key2", uploads.get(1).getKeyName()); + Assert.assertEquals("dir/key3", uploads.get(2).getKeyName()); + Assert.assertEquals("dir/key4", uploads.get(3).getKeyName()); + + // Add few more to test prefix. + + // Same way add few to cache and few to DB. + addinitMultipartUploadToCache(volume, bucket, "dir/ozonekey1"); + + initMultipartUpload(keyManager, volume, bucket, "dir/ozonekey2"); + + OmMultipartInfo omMultipartInfo3 =addinitMultipartUploadToCache(volume, + bucket, "dir/ozonekey3"); + + OmMultipartInfo omMultipartInfo4 = initMultipartUpload(keyManager, + volume, bucket, "dir/ozonekey4"); + + omMultipartUploadList = + keyManager.listMultipartUploads(volume, bucket, "dir/ozone"); + + //THEN + uploads = omMultipartUploadList.getUploads(); + Assert.assertEquals(4, uploads.size()); + Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName()); + Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName()); + Assert.assertEquals("dir/ozonekey3", uploads.get(2).getKeyName()); + Assert.assertEquals("dir/ozonekey4", uploads.get(3).getKeyName()); + + // Abort multipart upload for key in DB. + abortMultipart(volume, bucket, "dir/ozonekey4", + omMultipartInfo4.getUploadID()); + + // Now list. + omMultipartUploadList = + keyManager.listMultipartUploads(volume, bucket, "dir/ozone"); + + //THEN + uploads = omMultipartUploadList.getUploads(); + Assert.assertEquals(3, uploads.size()); + Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName()); + Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName()); + Assert.assertEquals("dir/ozonekey3", uploads.get(2).getKeyName()); + + // abort multipart upload for key in cache. + abortMultipart(volume, bucket, "dir/ozonekey3", + omMultipartInfo3.getUploadID()); + + // Now list. + omMultipartUploadList = + keyManager.listMultipartUploads(volume, bucket, "dir/ozone"); + + //THEN + uploads = omMultipartUploadList.getUploads(); + Assert.assertEquals(2, uploads.size()); + Assert.assertEquals("dir/ozonekey1", uploads.get(0).getKeyName()); + Assert.assertEquals("dir/ozonekey2", uploads.get(1).getKeyName()); + + } + @Test public void listMultipartUploadsWithPrefix() throws IOException { @@ -177,4 +273,30 @@ private OmMultipartInfo initMultipartUpload(KeyManagerImpl omtest, .build(); return omtest.initiateMultipartUpload(key1); } + + private OmMultipartInfo addinitMultipartUploadToCache( + String volume, String bucket, String key) { + Map partKeyInfoMap = + new HashMap<>(); + String uploadID = UUID.randomUUID().toString(); + OmMultipartKeyInfo multipartKeyInfo = new OmMultipartKeyInfo( + uploadID, Time.now(), ReplicationType.RATIS, + ReplicationFactor.THREE, partKeyInfoMap); + metadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(metadataManager.getMultipartKey(volume, bucket, key, + uploadID)), new CacheValue<>(Optional.of(multipartKeyInfo), + RandomUtils.nextInt())); + return new OmMultipartInfo(volume, bucket, key, uploadID); + } + + private void abortMultipart( + String volume, String bucket, String key, String uploadID) { + Map partKeyInfoMap = + new HashMap<>(); + metadataManager.getMultipartInfoTable().addCacheEntry( + new CacheKey<>(metadataManager.getMultipartKey(volume, bucket, key, + uploadID)), new CacheValue<>(Optional.absent(), + RandomUtils.nextInt())); + } + } From 9d26870b7745a3a04d9e5b1e1399b1d25a22e775 Mon Sep 17 00:00:00 2001 From: Bharat Viswanadham Date: Fri, 22 Nov 2019 14:01:30 -0800 Subject: [PATCH 2/2] minor --- .../java/org/apache/hadoop/ozone/om/KeyManagerImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index d92f5ac18235..8349601c95f8 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -1268,10 +1268,8 @@ public OmMultipartUploadList listMultipartUploads(String volumeName, } catch (IOException e) { LOG.warn( "Open key entry for multipart upload record can be read {}", - metadataManager - .getOzoneKey(upload.getVolumeName(), - upload.getBucketName(), - upload.getKeyName())); + metadataManager.getOzoneKey(upload.getVolumeName(), + upload.getBucketName(), upload.getKeyName())); } }) .collect(Collectors.toList());