diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java index 784738b0cec2..5902486ec6ee 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java @@ -32,12 +32,18 @@ public class RDBStoreIterator implements TableIterator { private RocksIterator rocksDBIterator; + private RDBTable rocksDBTable; public RDBStoreIterator(RocksIterator iterator) { this.rocksDBIterator = iterator; rocksDBIterator.seekToFirst(); } + public RDBStoreIterator(RocksIterator iterator, RDBTable table) { + this(iterator); + this.rocksDBTable = table; + } + @Override public void forEachRemaining( Consumer action) { @@ -100,6 +106,16 @@ public ByteArrayKeyValue value() { return null; } + @Override + public void removeFromDB() throws IOException { + if (rocksDBTable == null) { + throw new UnsupportedOperationException("remove"); + } + if (rocksDBIterator.isValid()) { + rocksDBTable.delete(rocksDBIterator.key()); + } + } + @Override public void close() throws IOException { rocksDBIterator.close(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 2e390e2362d1..4dbb59ad4412 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -206,7 +206,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key) public TableIterator iterator() { ReadOptions readOptions = new ReadOptions(); readOptions.setFillCache(false); - return new RDBStoreIterator(db.newIterator(handle, readOptions)); + return new RDBStoreIterator(db.newIterator(handle, readOptions), this); } @Override diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java index a684157a43b1..c9bc045b1df1 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java @@ -60,4 +60,12 @@ public interface TableIterator extends Iterator, Closeable { */ T value(); + /** + * Remove the actual value of the iterator from the database table on + * which the iterator is working on. + * + * @throws IOException when there is an error occured during deletion. + */ + void removeFromDB() throws IOException; + } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index 86d23afb9318..1451946f30dc 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -420,5 +420,10 @@ public TypedKeyValue next() { return new TypedKeyValue(rawIterator.next(), keyType, valueType); } + + @Override + public void removeFromDB() throws IOException { + rawIterator.removeFromDB(); + } } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java new file mode 100644 index 000000000000..6e85977843ac --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.hadoop.hdds.utils.db; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.rocksdb.RocksIterator; + +import java.util.NoSuchElementException; +import java.util.function.Consumer; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * This test prescribe expected behaviour from the RDBStoreIterator which wraps + * RocksDB's own iterator. Ozone internally in TypedTableIterator uses, the + * RDBStoreIterator to provide iteration over table elements in a typed manner. + * The tests are to ensure we access RocksDB via the iterator properly. + */ +public class TestRDBStoreIterator { + + private RocksIterator rocksDBIteratorMock; + private RDBTable rocksTableMock; + + @Before + public void setup() { + rocksDBIteratorMock = mock(RocksIterator.class); + rocksTableMock = mock(RDBTable.class); + } + + @Test + public void testForeachRemainingCallsConsumerWithAllElements() { + when(rocksDBIteratorMock.isValid()) + .thenReturn(true, true, true, true, true, true, false); + when(rocksDBIteratorMock.key()) + .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02}) + .thenThrow(new NoSuchElementException()); + when(rocksDBIteratorMock.value()) + .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d}) + .thenThrow(new NoSuchElementException()); + + + Consumer consumerStub = mock(Consumer.class); + + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + iter.forEachRemaining(consumerStub); + + ArgumentCaptor capture = + ArgumentCaptor.forClass(ByteArrayKeyValue.class); + verify(consumerStub, times(3)).accept(capture.capture()); + assertArrayEquals( + new byte[]{0x00}, capture.getAllValues().get(0).getKey()); + assertArrayEquals( + new byte[]{0x7f}, capture.getAllValues().get(0).getValue()); + assertArrayEquals( + new byte[]{0x01}, capture.getAllValues().get(1).getKey()); + assertArrayEquals( + new byte[]{0x7e}, capture.getAllValues().get(1).getValue()); + assertArrayEquals( + new byte[]{0x02}, capture.getAllValues().get(2).getKey()); + assertArrayEquals( + new byte[]{0x7d}, capture.getAllValues().get(2).getValue()); + } + + @Test + public void testHasNextDependsOnIsvalid(){ + when(rocksDBIteratorMock.isValid()).thenReturn(true, false); + + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + + assertTrue(iter.hasNext()); + assertFalse(iter.hasNext()); + } + + @Test + public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + + InOrder verifier = inOrder(rocksDBIteratorMock); + + iter.next(); + + verifier.verify(rocksDBIteratorMock).isValid(); + verifier.verify(rocksDBIteratorMock).key(); + verifier.verify(rocksDBIteratorMock).value(); + verifier.verify(rocksDBIteratorMock).next(); + } + + @Test + public void testConstructorSeeksToFirstElement() { + new RDBStoreIterator(rocksDBIteratorMock); + + verify(rocksDBIteratorMock, times(1)).seekToFirst(); + } + + @Test + public void testSeekToFirstSeeks() { + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + + iter.seekToFirst(); + + verify(rocksDBIteratorMock, times(2)).seekToFirst(); + } + + @Test + public void testSeekToLastSeeks() { + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + + iter.seekToLast(); + + verify(rocksDBIteratorMock, times(1)).seekToLast(); + } + + @Test + public void testSeekReturnsTheActualKey() { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + ByteArrayKeyValue val = iter.seek(new byte[]{0x55}); + + InOrder verifier = inOrder(rocksDBIteratorMock); + + verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time + verify(rocksDBIteratorMock, never()).seekToLast(); + verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class)); + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + verifier.verify(rocksDBIteratorMock, times(1)).value(); + assertArrayEquals(new byte[]{0x00}, val.getKey()); + assertArrayEquals(new byte[]{0x7f}, val.getValue()); + } + + @Test + public void testGettingTheKeyIfIteratorIsValid() { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + byte[] key = iter.key(); + + InOrder verifier = inOrder(rocksDBIteratorMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + assertArrayEquals(new byte[]{0x00}, key); + } + + @Test + public void testGettingTheValueIfIteratorIsValid() { + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00}); + when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f}); + + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + ByteArrayKeyValue val = iter.value(); + + InOrder verifier = inOrder(rocksDBIteratorMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksDBIteratorMock, times(1)).key(); + assertArrayEquals(new byte[]{0x00}, val.getKey()); + assertArrayEquals(new byte[]{0x7f}, val.getValue()); + } + + @Test + public void testRemovingFromDBActuallyDeletesFromTable() throws Exception { + byte[] testKey = new byte[]{0x00}; + when(rocksDBIteratorMock.isValid()).thenReturn(true); + when(rocksDBIteratorMock.key()).thenReturn(testKey); + + RDBStoreIterator iter = + new RDBStoreIterator(rocksDBIteratorMock, rocksTableMock); + iter.removeFromDB(); + + InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock); + + verifier.verify(rocksDBIteratorMock, times(1)).isValid(); + verifier.verify(rocksTableMock, times(1)).delete(testKey); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRemoveFromDBWithoutDBTableSet() throws Exception { + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + iter.removeFromDB(); + } + + @Test + public void testCloseCloses() throws Exception { + RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); + iter.close(); + + verify(rocksDBIteratorMock, times(1)).close(); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java index d661e3467b2c..e73539f70fc6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hdds.scm.metadata; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.UUID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.utils.db.Codec; @@ -30,12 +33,43 @@ public class PipelineIDCodec implements Codec { @Override public byte[] toPersistedFormat(PipelineID object) throws IOException { - return object.getProtobuf().toByteArray(); + byte[] bytes = new byte[16]; + System.arraycopy( + asByteArray(object.getId().getMostSignificantBits()), 0, bytes, 0, 8); + System.arraycopy( + asByteArray(object.getId().getLeastSignificantBits()), 0, bytes, 8, 8); + return bytes; + } + + private byte[] asByteArray(long bits) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(bits); + return buffer.array(); } @Override public PipelineID fromPersistedFormat(byte[] rawData) throws IOException { - return null; + long mostSiginificantBits = toLong(rawData, 0); + long leastSignificantBits = toLong(rawData, 8); + + UUID id = new UUID(mostSiginificantBits, leastSignificantBits); + return PipelineID.valueOf(id); + } + + private long toLong(byte[] arr, int startIdx) throws IOException { + if (arr.length < startIdx + 8) { + throw new IOException("Key conversion error.", + new ArrayIndexOutOfBoundsException( + "Key does not have the least expected amount of bytes," + + "and does not contain a UUID. Key: " + + Arrays.toString(arr) + ) + ); + } + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(arr, startIdx, 8); + buffer.flip(); + return buffer.getLong(); } @Override diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index e8223ca50455..fda937134c1c 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -161,12 +161,64 @@ protected void initializePipelineState() throws IOException { TableIterator> iterator = pipelineStore.iterator(); while (iterator.hasNext()) { - Pipeline pipeline = iterator.next().getValue(); + Pipeline pipeline = nextPipelineFromIterator(iterator); stateManager.addPipeline(pipeline); nodeManager.addPipeline(pipeline); } } + private Pipeline nextPipelineFromIterator( + TableIterator> it + ) throws IOException { + KeyValue actual = it.next(); + Pipeline pipeline = actual.getValue(); + PipelineID pipelineID = actual.getKey(); + checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID); + return pipeline; + } + + /** + * This method is part of the change that happens in HDDS-3925, and we can + * and should remove this on later on. + * The purpose of the change is to get rid of protobuf serialization in the + * SCM database Pipeline table keys. The keys are not used anywhere, and the + * PipelineID that is used as a key is in the value as well, so we can detect + * a change in the key translation to byte[] and if we have the old format + * we refresh the table contents during SCM startup. + * + * If this fails in the remove, then there is an IOException coming from + * RocksDB itself, in this case in memory structures will still be fine and + * SCM should be operational, however we will attempt to replace the old key + * at next startup. In this case removing of the pipeline will leave the + * pipeline in RocksDB, and during next startup we will attempt to delete it + * again. This does not affect any runtime operations. + * If a Pipeline should have been deleted but remained in RocksDB, then at + * next startup it will be replaced and added with the new key, then SCM will + * detect that it is an invalid Pipeline and successfully delete it with the + * new key. + * For further info check the JIRA. + * + * @param it the iterator used to iterate the Pipeline table + * @param pipeline the pipeline read already from the iterator + * @param pipelineID the pipeline ID read from the raw data via the iterator + */ + private void checkKeyAndReplaceIfObsolete( + TableIterator> it, + Pipeline pipeline, + PipelineID pipelineID + ) { + if (!pipelineID.equals(pipeline.getId())) { + try { + it.removeFromDB(); + pipelineStore.put(pipeline.getId(), pipeline); + } catch (IOException e) { + LOG.info("Pipeline table in RocksDB has an old key format, and " + + "removing the pipeline with the old key was unsuccessful." + + "Pipeline: {}", pipeline); + } + } + } + private void recordMetricsForPipeline(Pipeline pipeline) { metrics.incNumPipelineAllocated(); if (pipeline.isOpen()) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java new file mode 100644 index 000000000000..5543be5832b1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.metadata; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.junit.Test; + +import java.util.UUID; + +/** + * Testing serialization of PipelineID objects to/from RocksDB. + */ +public class TestPipelineIDCodec { + + @Test + public void testPersistingZeroAsUUID() throws Exception { + long leastSigBits = 0x0000_0000_0000_0000L; + long mostSigBits = 0x0000_0000_0000_0000L; + byte[] expected = new byte[] { + b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), + b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00) + }; + + checkPersisting(leastSigBits, mostSigBits, expected); + } + + @Test + public void testPersistingFFAsUUID() throws Exception { + long leastSigBits = 0xFFFF_FFFF_FFFF_FFFFL; + long mostSigBits = 0xFFFF_FFFF_FFFF_FFFFL; + byte[] expected = new byte[] { + b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), + b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF) + }; + + checkPersisting(leastSigBits, mostSigBits, expected); + } + + @Test + public void testPersistingARandomUUID() throws Exception { + for (int i=0; i<100; i++) { + UUID uuid = UUID.randomUUID(); + + long mask = 0x0000_0000_0000_00FFL; + + byte[] expected = new byte[] { + b(((int) (uuid.getMostSignificantBits() >> 56 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 48 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 40 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 32 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 24 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 16 & mask))), + b(((int) (uuid.getMostSignificantBits() >> 8 & mask))), + b(((int) (uuid.getMostSignificantBits() & mask))), + + b(((int) (uuid.getLeastSignificantBits() >> 56 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 48 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 40 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 32 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 24 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 16 & mask))), + b(((int) (uuid.getLeastSignificantBits() >> 8 & mask))), + b(((int) (uuid.getLeastSignificantBits() & mask))), + }; + + checkPersisting( + uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits(), + expected + ); + } + } + + @Test + public void testConvertAndReadBackZeroAsUUID() throws Exception { + long mostSigBits = 0x0000_0000_0000_0000L; + long leastSigBits = 0x0000_0000_0000_0000L; + UUID uuid = new UUID(mostSigBits, leastSigBits); + PipelineID pid = PipelineID.valueOf(uuid); + + byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid); + PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded); + + assertEquals(pid, decoded); + } + + @Test + public void testConvertAndReadBackFFAsUUID() throws Exception { + long mostSigBits = 0xFFFF_FFFF_FFFF_FFFFL; + long leastSigBits = 0xFFFF_FFFF_FFFF_FFFFL; + UUID uuid = new UUID(mostSigBits, leastSigBits); + PipelineID pid = PipelineID.valueOf(uuid); + + byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid); + PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded); + + assertEquals(pid, decoded); + } + + @Test + public void testConvertAndReadBackRandomUUID() throws Exception { + UUID uuid = UUID.randomUUID(); + PipelineID pid = PipelineID.valueOf(uuid); + + byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid); + PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded); + + assertEquals(pid, decoded); + } + + private void checkPersisting( + long mostSigBits, long leastSigBits, byte[] expected + ) throws Exception { + UUID uuid = new UUID(mostSigBits, leastSigBits); + PipelineID pid = PipelineID.valueOf(uuid); + + byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid); + + assertArrayEquals(expected, encoded); + } + + private byte b(int i) { + return (byte) (i & 0x0000_00FF); + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index 7c2f17e85840..fc8f61a7dbf1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -21,8 +21,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -37,11 +39,15 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.MockNodeManager; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl; import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.TableIterator; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; @@ -56,7 +62,14 @@ import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test cases to verify PipelineManager. @@ -539,6 +552,108 @@ public void testSafeModeUpdatedOnSafemodeExit() pipelineManager.close(); } + /** + * This test was created for HDDS-3925 to check whether the db handling is + * proper at the SCMPipelineManager level. We should remove this test + * when we remove the key swap from the SCMPipelineManager code. + * + * The test emulates internally the values that the iterator will provide + * back to the check-fix code path. The iterator internally deserialize the + * key stored in RocksDB using the PipelineIDCodec. The older version of the + * codec serialized the PipelineIDs by taking the byte[] representation of + * the protobuf representation of the PipelineID, and deserialization was not + * implemented. + * + * In order to be able to check and fix the change, the deserialization was + * introduced, and deserialisation of the old protobuf byte representation + * with the new deserialization logic of the keys are + * checked against the PipelineID serialized in the value as well via + * protobuf. + * The DB is storing the keys now based on a byte[] serialized from the UUID + * inside the PipelineID. + * For this we emulate the getKey of the KeyValue returned by the + * iterator to return a PipelineID that is deserialized from the byte[] + * representation of the protobuf representation of the PipelineID in the + * test, as that would be the value we get from the iterator when iterating + * through a table with the old key format. + * + * @throws Exception when something goes wrong + */ + @Test + public void testPipelineDBKeyFormatChange() throws Exception { + Pipeline p1 = pipelineStub(); + Pipeline p2 = pipelineStub(); + Pipeline p3 = pipelineStub(); + + TableIterator> iteratorMock = + mock(TableIterator.class); + + KeyValue kv1 = + mockKeyValueToProvideOldKeyFormat(p1); + KeyValue kv2 = + mockKeyValueToProvideNormalFormat(p2); + KeyValue kv3 = + mockKeyValueToProvideOldKeyFormat(p3); + + when(iteratorMock.next()) + .thenReturn(kv1, kv2, kv3) + .thenThrow(new NoSuchElementException()); + when(iteratorMock.hasNext()) + .thenReturn(true, true, true, false); + + Table pipelineStore = mock(Table.class); + doReturn(iteratorMock).when(pipelineStore).iterator(); + when(pipelineStore.isEmpty()).thenReturn(false); + + InOrder inorderVerifier = inOrder(pipelineStore, iteratorMock); + + new SCMPipelineManager(conf, nodeManager, pipelineStore, new EventQueue()); + + inorderVerifier.verify(iteratorMock).removeFromDB(); + inorderVerifier.verify(pipelineStore).put(p1.getId(), p1); + inorderVerifier.verify(iteratorMock).removeFromDB(); + inorderVerifier.verify(pipelineStore).put(p3.getId(), p3); + + verify(pipelineStore, never()).put(p2.getId(), p2); + } + + private Pipeline pipelineStub() { + return Pipeline.newBuilder() + .setId(PipelineID.randomId()) + .setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.ONE) + .setState(Pipeline.PipelineState.OPEN) + .setNodes( + Arrays.asList( + nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).get(0) + ) + ) + .setNodesInOrder(Arrays.asList(0)) + .build(); + } + + private KeyValue + mockKeyValueToProvideOldKeyFormat(Pipeline pipeline) + throws IOException { + KeyValue kv = mock(KeyValue.class); + when(kv.getValue()).thenReturn(pipeline); + when(kv.getKey()) + .thenReturn( + new PipelineIDCodec().fromPersistedFormat( + pipeline.getId().getProtobuf().toByteArray() + )); + return kv; + } + + private KeyValue + mockKeyValueToProvideNormalFormat(Pipeline pipeline) + throws IOException { + KeyValue kv = mock(KeyValue.class); + when(kv.getValue()).thenReturn(pipeline); + when(kv.getKey()).thenReturn(pipeline.getId()); + return kv; + } + private void sendPipelineReport(DatanodeDetails dn, Pipeline pipeline, PipelineReportHandler pipelineReportHandler, boolean isLeader, EventQueue eventQueue) {