Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
Expand Down Expand Up @@ -426,28 +427,30 @@ private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
* in RocksDB callback flush. If multiple operations are flushed in one
* specific batch, we are not sure at the flush of which specific operation
* the callback is coming.
* There could be a possibility of race condition that is exposed to rocksDB
* behaviour for the batch.
* PurgeSnapshot is also considered a barrier, since purgeSnapshot transaction on a standalone basis is an
* idempotent operation. Once the directory gets deleted the previous transactions that have been performed on the
* snapshotted rocksdb would start failing on replay since those transactions have not been committed but the
* directory could have been partially deleted/ fully deleted. This could also lead to inconsistencies in the DB
* reads from the purged rocksdb if operations are not performed consciously.
* There could be a possibility of race condition that is exposed to rocksDB behaviour for the batch.
* Hence, we treat createSnapshot as separate batch flush.
* <p>
* e.g. requestBuffer = [request1, request2, snapshotRequest1,
* request3, snapshotRequest2, request4]
* response = [[request1, request2], [snapshotRequest1], [request3],
* [snapshotRequest2], [request4]]
*/
private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
private synchronized List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
final List<Queue<Entry>> response = new ArrayList<>();

OMResponse previousOmResponse = null;
for (final Entry entry : readyBuffer) {
OMResponse omResponse = entry.getResponse().getOMResponse();
// New queue gets created in three conditions:
// 1. It is first element in the response,
// 2. Current request is createSnapshot request.
// 3. Previous request was createSnapshot request.
if (response.isEmpty() || omResponse.hasCreateSnapshotResponse()
|| (previousOmResponse != null &&
previousOmResponse.hasCreateSnapshotResponse())) {
// 2. Current request is createSnapshot/purgeSnapshot request.
// 3. Previous request was createSnapshot/purgeSnapshot request.
if (response.isEmpty() || isStandaloneBatchCmdTypes(omResponse)
|| isStandaloneBatchCmdTypes(previousOmResponse)) {
response.add(new LinkedList<>());
}

Expand All @@ -458,6 +461,15 @@ private List<Queue<Entry>> splitReadyBufferAtCreateSnapshot() {
return response;
}

private static boolean isStandaloneBatchCmdTypes(OMResponse response) {
if (response == null) {
return false;
}
final OzoneManagerProtocolProtos.Type type = response.getCmdType();
return type == OzoneManagerProtocolProtos.Type.SnapshotPurge
|| type == OzoneManagerProtocolProtos.Type.CreateSnapshot;
}

private void addCleanupEntry(Entry entry, Map<String, List<Long>> cleanupEpochs) {
Class<? extends OMClientResponse> responseClass =
entry.getResponse().getClass();
Expand Down Expand Up @@ -612,7 +624,7 @@ int getCurrentBufferSize() {
return currentBuffer.size();
}

int getReadyBufferSize() {
synchronized int getReadyBufferSize() {
return readyBuffer.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.apache.hadoop.ozone.om.response.bucket.OMBucketCreateResponse;
import org.apache.hadoop.ozone.om.response.key.OMKeyCreateResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotPurgeResponse;
import org.apache.hadoop.ozone.om.s3.S3SecretCacheProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
Expand Down Expand Up @@ -81,12 +81,12 @@ class TestOzoneManagerDoubleBuffer {
private OzoneManagerDoubleBuffer doubleBuffer;
private OzoneManager ozoneManager;
private S3SecretLockedManager secretManager;
private final CreateSnapshotResponse snapshotResponse1 = mock(CreateSnapshotResponse.class);
private final CreateSnapshotResponse snapshotResponse2 = mock(CreateSnapshotResponse.class);
private final OMResponse omKeyResponse = mock(OMResponse.class);
private final OMResponse omBucketResponse = mock(OMResponse.class);
private final OMResponse omSnapshotResponse1 = mock(OMResponse.class);
private final OMResponse omSnapshotResponse2 = mock(OMResponse.class);
private final OMResponse omSnapshotPurgeResponseProto1 = mock(OMResponse.class);
private final OMResponse omSnapshotPurgeResponseProto2 = mock(OMResponse.class);
private static OMClientResponse omKeyCreateResponse =
mock(OMKeyCreateResponse.class);
private static OMClientResponse omBucketCreateResponse =
Expand All @@ -95,6 +95,9 @@ class TestOzoneManagerDoubleBuffer {
mock(OMSnapshotCreateResponse.class);
private static OMClientResponse omSnapshotCreateResponse2 =
mock(OMSnapshotCreateResponse.class);
private static OMClientResponse omSnapshotPurgeResponse1 = mock(OMSnapshotPurgeResponse.class);
private static OMClientResponse omSnapshotPurgeResponse2 = mock(OMSnapshotPurgeResponse.class);

@TempDir
private File tempDir;
private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier;
Expand Down Expand Up @@ -143,26 +146,33 @@ public void setup() throws IOException {
doNothing().when(omBucketCreateResponse).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotCreateResponse1).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotCreateResponse2).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotPurgeResponse1).checkAndUpdateDB(any(), any());
doNothing().when(omSnapshotPurgeResponse2).checkAndUpdateDB(any(), any());

when(omKeyResponse.getTraceID()).thenReturn("keyTraceId");
when(omBucketResponse.getTraceID()).thenReturn("bucketTraceId");
when(omSnapshotResponse1.getTraceID()).thenReturn("snapshotTraceId-1");
when(omSnapshotResponse2.getTraceID()).thenReturn("snapshotTraceId-2");
when(omSnapshotResponse1.hasCreateSnapshotResponse())
.thenReturn(true);
when(omSnapshotResponse2.hasCreateSnapshotResponse())
.thenReturn(true);
when(omSnapshotResponse1.getCreateSnapshotResponse())
.thenReturn(snapshotResponse1);
when(omSnapshotResponse2.getCreateSnapshotResponse())
.thenReturn(snapshotResponse2);
when(omSnapshotPurgeResponseProto1.getTraceID()).thenReturn("snapshotPurgeTraceId-1");
when(omSnapshotPurgeResponseProto2.getTraceID()).thenReturn("snapshotPurgeTraceId-2");

when(omKeyResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateKey);
when(omBucketResponse.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.CreateBucket);
when(omSnapshotPurgeResponseProto1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotPurgeResponseProto2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotResponse1.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);
when(omSnapshotResponse2.getCmdType()).thenReturn(OzoneManagerProtocolProtos.Type.SnapshotPurge);

when(omKeyCreateResponse.getOMResponse()).thenReturn(omKeyResponse);
when(omBucketCreateResponse.getOMResponse()).thenReturn(omBucketResponse);
when(omSnapshotCreateResponse1.getOMResponse())
.thenReturn(omSnapshotResponse1);
when(omSnapshotCreateResponse2.getOMResponse())
.thenReturn(omSnapshotResponse2);
when(omSnapshotPurgeResponse1.getOMResponse())
.thenReturn(omSnapshotPurgeResponseProto1);
when(omSnapshotPurgeResponse2.getOMResponse())
.thenReturn(omSnapshotPurgeResponseProto2);
}

@AfterEach
Expand Down Expand Up @@ -194,8 +204,35 @@ private static Stream<Arguments> doubleBufferFlushCases() {
omSnapshotCreateResponse1,
omSnapshotCreateResponse2,
omBucketCreateResponse),
4L, 4L, 14L, 16L, 1L, 1.142F)
);
4L, 4L, 14L, 16L, 1L, 1.142F),
Arguments.of(Arrays.asList(omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2),
2L, 2L, 16L, 18L, 1L, 1.125F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omBucketCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2),
3L, 4L, 19L, 22L, 2L, 1.157F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omSnapshotPurgeResponse1,
omBucketCreateResponse,
omSnapshotPurgeResponse2),
4L, 4L, 23L, 26L, 1L, 1.1300F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotPurgeResponse2,
omBucketCreateResponse),
4L, 4L, 27L, 30L, 1L, 1.111F),
Arguments.of(Arrays.asList(omKeyCreateResponse,
omBucketCreateResponse,
omSnapshotPurgeResponse1,
omSnapshotCreateResponse1,
omSnapshotPurgeResponse2,
omBucketCreateResponse,
omSnapshotCreateResponse2),
6L, 7L, 33L, 37L, 2L, 1.121F)

);
}

/**
Expand Down