Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;

import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequest;
import org.apache.hadoop.ozone.om.request.key.OMKeyCommitRequestWithFSO;
Expand All @@ -74,20 +76,25 @@
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -100,6 +107,7 @@
* Test HSync.
*/
@Timeout(value = 300)
@TestMethodOrder(OrderAnnotation.class)
public class TestHSync {
private static final Logger LOG =
LoggerFactory.getLogger(TestHSync.class);
Expand All @@ -109,18 +117,21 @@ public class TestHSync {

private static final OzoneConfiguration CONF = new OzoneConfiguration();
private static OzoneClient client;
private static final BucketLayout BUCKET_LAYOUT = BucketLayout.FILE_SYSTEM_OPTIMIZED;

@BeforeAll
public static void init() throws Exception {
final int chunkSize = 4 << 10;
final int flushSize = 2 * chunkSize;
final int maxFlushSize = 2 * flushSize;
final int blockSize = 2 * maxFlushSize;
final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
final BucketLayout layout = BUCKET_LAYOUT;

CONF.setBoolean(OZONE_OM_RATIS_ENABLE_KEY, false);
CONF.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
CONF.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
// Reduce KeyDeletingService interval
CONF.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS);
cluster = MiniOzoneCluster.newBuilder(CONF)
.setNumDatanodes(5)
.setTotalPipelineNumLimit(10)
Expand Down Expand Up @@ -153,6 +164,83 @@ public static void teardown() {
}
}

@Test
// Making this the first test to be run to avoid db key composition headaches
@Order(1)
public void testKeyMetadata() throws Exception {
// Tests key metadata behavior upon create(), hsync() and close():
// 1. When a key is create()'d, neither OpenKeyTable nor KeyTable entry shall have hsync metadata.
// 2. When the key is hsync()'ed, both OpenKeyTable and KeyTable shall have hsync metadata.
// 3. When the key is hsync()'ed again, both OpenKeyTable and KeyTable shall have hsync metadata.
// 4. When the key is close()'d, KeyTable entry shall not have hsync metadata. Key shall not exist in OpenKeyTable.

final String rootPath = String.format("%s://%s/",
OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final String keyName = "file-test-key-metadata";
final Path file = new Path(dir, keyName);

OMMetadataManager omMetadataManager =
cluster.getOzoneManager().getMetadataManager();

// Expect empty OpenKeyTable and KeyTable before key creation
Table<String, OmKeyInfo> openKeyTable = omMetadataManager.getOpenKeyTable(BUCKET_LAYOUT);
assertTrue(openKeyTable.isEmpty());
Table<String, OmKeyInfo> keyTable = omMetadataManager.getKeyTable(BUCKET_LAYOUT);
assertTrue(keyTable.isEmpty());

try (FileSystem fs = FileSystem.get(CONF)) {
try (FSDataOutputStream os = fs.create(file, true)) {
// Wait for double buffer flush to avoid flakiness because RDB iterator bypasses table cache
cluster.getOzoneManager().awaitDoubleBufferFlush();
// OpenKeyTable key should NOT have HSYNC_CLIENT_ID
OmKeyInfo keyInfo = getFirstKeyInTable(keyName, openKeyTable);
assertFalse(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));
// KeyTable should still be empty
assertTrue(keyTable.isEmpty());

os.hsync();
cluster.getOzoneManager().awaitDoubleBufferFlush();
// OpenKeyTable key should have HSYNC_CLIENT_ID now
keyInfo = getFirstKeyInTable(keyName, openKeyTable);
assertTrue(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));
// KeyTable key should be there and have HSYNC_CLIENT_ID
keyInfo = getFirstKeyInTable(keyName, keyTable);
assertTrue(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));

// hsync again, metadata should not change
os.hsync();
cluster.getOzoneManager().awaitDoubleBufferFlush();
keyInfo = getFirstKeyInTable(keyName, openKeyTable);
assertTrue(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));
keyInfo = getFirstKeyInTable(keyName, keyTable);
assertTrue(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));
}
// key is closed, OpenKeyTable should be empty
cluster.getOzoneManager().awaitDoubleBufferFlush();
assertTrue(openKeyTable.isEmpty());
// KeyTable should have the key. But the key shouldn't have metadata HSYNC_CLIENT_ID anymore
OmKeyInfo keyInfo = getFirstKeyInTable(keyName, keyTable);
assertFalse(keyInfo.getMetadata().containsKey(OzoneConsts.HSYNC_CLIENT_ID));

// Clean up
assertTrue(fs.delete(file, false));
// Wait for KeyDeletingService to finish to avoid interfering other tests
Table<String, RepeatedOmKeyInfo> deletedTable = omMetadataManager.getDeletedTable();
GenericTestUtils.waitFor(
() -> {
try {
return deletedTable.isEmpty();
} catch (IOException e) {
return false;
}
}, 250, 10000);
}
}

@Test
public void testKeyHSyncThenClose() throws Exception {
// Check that deletedTable should not have keys with the same block as in
Expand Down Expand Up @@ -556,6 +644,23 @@ public void testDisableHsync() throws Exception {
}
}

/**
* Helper method to check and get the first key in the OpenKeyTable.
* @param keyName expect key name to contain this string
* @param openKeyTable Table<String, OmKeyInfo>
* @return OmKeyInfo
*/
private OmKeyInfo getFirstKeyInTable(String keyName, Table<String, OmKeyInfo> openKeyTable) throws IOException {
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> it = openKeyTable.iterator()) {
assertTrue(it.hasNext());
Table.KeyValue<String, OmKeyInfo> kv = it.next();
String dbOpenKey = kv.getKey();
assertNotNull(dbOpenKey);
assertTrue(dbOpenKey.contains(keyName));
return kv.getValue();
}
}

private void testEncryptedStreamCapabilities(boolean isEC) throws IOException,
GeneralSecurityException {
KeyOutputStream kos;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ public class TestLeaseRecovery {
* is no longer open in OM. This is currently expected (see HDDS-9358).
*/
public static void closeIgnoringKeyNotFound(OutputStream stream) {
closeIgnoringOMException(stream, OMException.ResultCodes.KEY_NOT_FOUND);
}

public static void closeIgnoringOMException(OutputStream stream, OMException.ResultCodes expectedResultCode) {
try {
stream.close();
} catch (IOException e) {
assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, ((OMException)e).getResult());
assertEquals(expectedResultCode, ((OMException)e).getResult());
}
}

Expand Down Expand Up @@ -327,7 +331,11 @@ public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws Exc
// Since all DNs are out, then the length in OM keyInfo will be used as the final file length
assertEquals(dataSize, fileStatus.getLen());
} finally {
closeIgnoringKeyNotFound(stream);
if (!forceRecovery) {
closeIgnoringOMException(stream, OMException.ResultCodes.KEY_UNDER_LEASE_RECOVERY);
} else {
closeIgnoringKeyNotFound(stream);
}
KeyValueHandler.setInjector(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import static org.apache.hadoop.ozone.OzoneConsts.DB_TRANSIENT_MARKER;
import static org.apache.hadoop.ozone.OzoneConsts.HSYNC_CLIENT_ID;
import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT;
Expand Down Expand Up @@ -1207,10 +1206,8 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
// listKeys do. But that complicates the iteration logic by quite a bit.
// And if we do that, we need to refactor listKeys as well to dedup.

final Table<String, OmKeyInfo> okTable, kTable;
final Table<String, OmKeyInfo> okTable;
okTable = getOpenKeyTable(bucketLayout);
// keyTable required to check key hsync metadata. TODO: HDDS-10077
kTable = getKeyTable(bucketLayout);

// No lock required since table iterator creates a "snapshot"
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
Expand All @@ -1228,13 +1225,7 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
String dbKey = kv.getKey();
long clientID = OMMetadataManager.getClientIDFromOpenKeyDBKey(dbKey);
OmKeyInfo omKeyInfo = kv.getValue();

// Trim client ID to get the keyTable dbKey
int lastSlashIdx = dbKey.lastIndexOf(OM_KEY_PREFIX);
String ktDbKey = dbKey.substring(0, lastSlashIdx);
// Check whether the key has been hsync'ed by checking keyTable
checkAndUpdateKeyHsyncStatus(omKeyInfo, ktDbKey, kTable);

// Note with HDDS-10077, there is no need to check KeyTable for hsync metadata
openKeySessionList.add(
new OpenKeySession(clientID, omKeyInfo,
omKeyInfo.getLatestVersionLocations().getVersion()));
Expand All @@ -1261,23 +1252,6 @@ public ListOpenFilesResult listOpenFiles(BucketLayout bucketLayout,
openKeySessionList);
}

/**
* Check and update OmKeyInfo from OpenKeyTable with hsync status in KeyTable.
*/
private void checkAndUpdateKeyHsyncStatus(OmKeyInfo omKeyInfo,
String dbKey,
Table<String, OmKeyInfo> kTable)
throws IOException {
OmKeyInfo ktOmKeyInfo = kTable.get(dbKey);
if (ktOmKeyInfo != null) {
// The same key in OpenKeyTable also exists in KeyTable, indicating
// the key has been hsync'ed
String hsyncClientId = ktOmKeyInfo.getMetadata().get(HSYNC_CLIENT_ID);
// Append HSYNC_CLIENT_ID to OmKeyInfo to be returned to the client
omKeyInfo.getMetadata().put(HSYNC_CLIENT_ID, hsyncClientId);
}
}

@Override
public ListKeysResult listKeys(String volumeName, String bucketName,
String startKey, String keyPrefix, int maxKeys)
Expand Down Expand Up @@ -1878,8 +1852,7 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
final String clientIdString
= dbOpenKeyName.substring(lastPrefix + 1);

final OmKeyInfo info = kt.get(dbKeyName);
final boolean isHsync = java.util.Optional.ofNullable(info)
final boolean isHsync = java.util.Optional.of(openKeyInfo)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
.filter(id -> id.equals(clientIdString))
Expand All @@ -1892,6 +1865,7 @@ public ExpiredOpenKeys getExpiredOpenKeys(Duration expireThreshold,
} else if (isHsync && openKeyInfo.getModificationTime() <= expiredLeaseTimestamp &&
!openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
// add hsync'ed keys
final OmKeyInfo info = kt.get(dbKeyName);
final KeyArgs.Builder keyArgs = KeyArgs.newBuilder()
.setVolumeName(info.getVolumeName())
.setBucketName(info.getBucketName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.OMAction;
import org.apache.hadoop.ozone.om.OMMetadataManager;
Expand Down Expand Up @@ -238,7 +236,7 @@ private RecoverLeaseResponse doWork(OzoneManager ozoneManager,
openKeyInfo.setModificationTime(Time.now());
// add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, openKeyInfo));
dbOpenFileKey, openKeyInfo, transactionLogIndex);
}
// override key name with normalizedKeyPath
keyInfo.setKeyName(keyName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.WithMetadata;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.request.util.OmKeyHSyncUtil;
import org.apache.hadoop.ozone.om.request.validation.RequestFeatureValidator;
import org.apache.hadoop.ozone.om.request.validation.RequestProcessingPhase;
import org.apache.hadoop.ozone.om.request.validation.ValidationCondition;
Expand Down Expand Up @@ -81,8 +82,7 @@
public class OMKeyCommitRequest extends OMKeyRequest {

@VisibleForTesting
public static final Logger LOG =
LoggerFactory.getLogger(OMKeyCommitRequest.class);
public static final Logger LOG = LoggerFactory.getLogger(OMKeyCommitRequest.class);

public OMKeyCommitRequest(OMRequest omRequest, BucketLayout bucketLayout) {
super(omRequest, bucketLayout);
Expand Down Expand Up @@ -237,7 +237,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

final String clientIdString = String.valueOf(writerClientId);
if (null != keyToDelete) {
isPreviousCommitHsync = java.util.Optional.ofNullable(keyToDelete)
isPreviousCommitHsync = java.util.Optional.of(keyToDelete)
.map(WithMetadata::getMetadata)
.map(meta -> meta.get(OzoneConsts.HSYNC_CLIENT_ID))
.filter(id -> id.equals(clientIdString))
Expand All @@ -260,17 +260,24 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
" metadata while recovery flag is not set in request", KEY_UNDER_LEASE_RECOVERY);
}
}
omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));

omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());

// non-null indicates it is necessary to update the open key
OmKeyInfo newOpenKeyInfo = null;

if (isHSync) {
omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, clientIdString);
} else if (isRecovery) {
omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
if (!OmKeyHSyncUtil.isHSyncedPreviously(omKeyInfo, clientIdString, dbOpenKey)) {
// Update open key as well if it is the first hsync of this key
omKeyInfo.getMetadata().put(OzoneConsts.HSYNC_CLIENT_ID, clientIdString);
newOpenKeyInfo = omKeyInfo.copyObject();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification time should be updated for newOpenKeyInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Then the mod time in open key would indicate the first time the key is hsync'ed (since open key will only be updated on the first hsync).

Done.

}
}

omKeyInfo.getMetadata().putAll(KeyValueUtil.getFromProtobuf(
commitKeyArgs.getMetadataList()));
omKeyInfo.setDataSize(commitKeyArgs.getDataSize());
omKeyInfo.setModificationTime(commitKeyArgs.getModificationTime());

// Update the block length for each block, return the allocated but
// uncommitted blocks
List<OmKeyLocationInfo> uncommitted =
Expand Down Expand Up @@ -337,6 +344,16 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn
// So that this key can't be committed again.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKey, trxnLogIndex);

// Prevent hsync metadata from getting committed to the final key
omKeyInfo.getMetadata().remove(OzoneConsts.HSYNC_CLIENT_ID);
if (isRecovery) {
omKeyInfo.getMetadata().remove(OzoneConsts.LEASE_RECOVERY);
}
} else if (newOpenKeyInfo != null) {
// isHSync is true and newOpenKeyInfo is set, update OpenKeyTable
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
dbOpenKey, newOpenKeyInfo, trxnLogIndex);
}

omMetadataManager.getKeyTable(getBucketLayout()).addCacheEntry(
Expand All @@ -346,7 +363,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, TermIn

omClientResponse = new OMKeyCommitResponse(omResponse.build(),
omKeyInfo, dbOzoneKey, dbOpenKey, omBucketInfo.copyObject(),
oldKeyVersionsToDeleteMap, isHSync);
oldKeyVersionsToDeleteMap, isHSync, newOpenKeyInfo);

result = Result.SUCCESS;
} catch (IOException | InvalidPathException ex) {
Expand Down
Loading