diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index a6fcc40dda17..8e4cc9fbf4db 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ozone.om.codec.OMDBDefinition; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Time; @@ -426,8 +427,12 @@ private String addToBatch(Queue buffer, BatchOperation batchOperation) { * in RocksDB callback flush. If multiple operations are flushed in one * specific batch, we are not sure at the flush of which specific operation * the callback is coming. - * There could be a possibility of race condition that is exposed to rocksDB - * behaviour for the batch. + * PurgeSnapshot is also considered a barrier, since purgeSnapshot transaction on a standalone basis is an + * idempotent operation. Once the directory gets deleted the previous transactions that have been performed on the + * snapshotted rocksdb would start failing on replay since those transactions have not been committed but the + * directory could have been partially deleted/ fully deleted. This could also lead to inconsistencies in the DB + * reads from the purged rocksdb if operations are not performed consciously. + * There could be a possibility of race condition that is exposed to rocksDB behaviour for the batch. * Hence, we treat createSnapshot as separate batch flush. *

* e.g. requestBuffer = [request1, request2, snapshotRequest1, @@ -435,19 +440,17 @@ private String addToBatch(Queue buffer, BatchOperation batchOperation) { * response = [[request1, request2], [snapshotRequest1], [request3], * [snapshotRequest2], [request4]] */ - private List> splitReadyBufferAtCreateSnapshot() { + private synchronized List> splitReadyBufferAtCreateSnapshot() { final List> response = new ArrayList<>(); - OMResponse previousOmResponse = null; for (final Entry entry : readyBuffer) { OMResponse omResponse = entry.getResponse().getOMResponse(); // New queue gets created in three conditions: // 1. It is first element in the response, - // 2. Current request is createSnapshot request. - // 3. Previous request was createSnapshot request. - if (response.isEmpty() || omResponse.hasCreateSnapshotResponse() - || (previousOmResponse != null && - previousOmResponse.hasCreateSnapshotResponse())) { + // 2. Current request is createSnapshot/purgeSnapshot request. + // 3. Previous request was createSnapshot/purgeSnapshot request. + if (response.isEmpty() || isStandaloneBatchCmdTypes(omResponse) + || isStandaloneBatchCmdTypes(previousOmResponse)) { response.add(new LinkedList<>()); } @@ -458,6 +461,15 @@ private List> splitReadyBufferAtCreateSnapshot() { return response; } + private static boolean isStandaloneBatchCmdTypes(OMResponse response) { + if (response == null) { + return false; + } + final OzoneManagerProtocolProtos.Type type = response.getCmdType(); + return type == OzoneManagerProtocolProtos.Type.SnapshotPurge + || type == OzoneManagerProtocolProtos.Type.CreateSnapshot; + } + private void addCleanupEntry(Entry entry, Map> cleanupEpochs) { Class responseClass = entry.getResponse().getClass(); @@ -612,7 +624,7 @@ int getCurrentBufferSize() { return currentBuffer.size(); } - int getReadyBufferSize() { + synchronized int getReadyBufferSize() { return readyBuffer.size(); } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java index 125c9efcaf2d..6e24c9ff93f8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java @@ -44,9 +44,9 @@ import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse; import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse; import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse; +import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse; import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosName; @@ -81,12 +81,12 @@ class TestOzoneManagerDoubleBuffer { private OzoneManagerDoubleBuffer doubleBuffer; private OzoneManager ozoneManager; private S3SecretLockedManager secretManager; - private final CreateSnapshotResponse snapshotResponse1 = mock(CreateSnapshotResponse.class); - private final CreateSnapshotResponse snapshotResponse2 = mock(CreateSnapshotResponse.class); private final OMResponse omKeyResponse = mock(OMResponse.class); private final OMResponse omBucketResponse = mock(OMResponse.class); private final OMResponse omSnapshotResponse1 = mock(OMResponse.class); private final OMResponse omSnapshotResponse2 = mock(OMResponse.class); + private final OMResponse omSnapshotPurgeResponseProto1 = mock(OMResponse.class); + private final OMResponse omSnapshotPurgeResponseProto2 = mock(OMResponse.class); private static OMClientResponse omKeyCreateResponse = mock(OMKeyCreateResponse.class); private static OMClientResponse omBucketCreateResponse = @@ -95,6 +95,9 @@ class TestOzoneManagerDoubleBuffer { mock(OMSnapshotCreateResponse.class); private static OMClientResponse omSnapshotCreateResponse2 = mock(OMSnapshotCreateResponse.class); + private static OMClientResponse omSnapshotPurgeResponse1 = mock(OMSnapshotPurgeResponse.class); + private static OMClientResponse omSnapshotPurgeResponse2 = mock(OMSnapshotPurgeResponse.class); + @TempDir private File tempDir; private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier; @@ -143,19 +146,22 @@ public void setup() throws IOException { doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any()); doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any()); doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any()); + doNothing().when(omSnapshotPurgeResponse1).checkAndUpdateDB(any(), any()); + doNothing().when(omSnapshotPurgeResponse2).checkAndUpdateDB(any(), any()); when(omKeyResponse.getTraceID()).thenReturn("keyTraceId"); when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId"); when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1"); when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2"); - when(omSnapshotResponse1.hasCreateSnapshotResponse()) - .thenReturn(true); - when(omSnapshotResponse2.hasCreateSnapshotResponse()) - .thenReturn(true); - when(omSnapshotResponse1.getCreateSnapshotResponse()) - .thenReturn(snapshotResponse1); - when(omSnapshotResponse2.getCreateSnapshotResponse()) - .thenReturn(snapshotResponse2); + when(omSnapshotPurgeResponseProto1.getTraceID()).thenReturn("snapshotPurgeTraceId-1"); + when(omSnapshotPurgeResponseProto2.getTraceID()).thenReturn("snapshotPurgeTraceId-2"); + + when(omKeyResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateKey); + when(omBucketResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateBucket); + when(omSnapshotPurgeResponseProto1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge); + when(omSnapshotPurgeResponseProto2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge); + when(omSnapshotResponse1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge); + when(omSnapshotResponse2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge); when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse); when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse); @@ -163,6 +169,10 @@ public void setup() throws IOException { .thenReturn(omSnapshotResponse1); when(omSnapshotCreateResponse2.getOMResponse()) .thenReturn(omSnapshotResponse2); + when(omSnapshotPurgeResponse1.getOMResponse()) + .thenReturn(omSnapshotPurgeResponseProto1); + when(omSnapshotPurgeResponse2.getOMResponse()) + .thenReturn(omSnapshotPurgeResponseProto2); } @AfterEach @@ -194,8 +204,35 @@ private static Stream doubleBufferFlushCases() { omSnapshotCreateResponse1, omSnapshotCreateResponse2, omBucketCreateResponse), - 4L, 4L, 14L, 16L, 1L, 1.142F) - ); + 4L, 4L, 14L, 16L, 1L, 1.142F), + Arguments.of(Arrays.asList(omSnapshotPurgeResponse1, + omSnapshotPurgeResponse2), + 2L, 2L, 16L, 18L, 1L, 1.125F), + Arguments.of(Arrays.asList(omKeyCreateResponse, + omBucketCreateResponse, + omSnapshotPurgeResponse1, + omSnapshotPurgeResponse2), + 3L, 4L, 19L, 22L, 2L, 1.157F), + Arguments.of(Arrays.asList(omKeyCreateResponse, + omSnapshotPurgeResponse1, + omBucketCreateResponse, + omSnapshotPurgeResponse2), + 4L, 4L, 23L, 26L, 1L, 1.1300F), + Arguments.of(Arrays.asList(omKeyCreateResponse, + omSnapshotPurgeResponse1, + omSnapshotPurgeResponse2, + omBucketCreateResponse), + 4L, 4L, 27L, 30L, 1L, 1.111F), + Arguments.of(Arrays.asList(omKeyCreateResponse, + omBucketCreateResponse, + omSnapshotPurgeResponse1, + omSnapshotCreateResponse1, + omSnapshotPurgeResponse2, + omBucketCreateResponse, + omSnapshotCreateResponse2), + 6L, 7L, 33L, 37L, 2L, 1.121F) + + ); } /**