Skip to content

Commit d97fa28

Browse files
fapiftaChenSammi
authored andcommitted
HDDS-3925. SCM Pipeline DB should directly use UUID bytes for key rather than rely on proto serialization for key. (#1197)
(cherry picked from commit 0a1cce5)
1 parent 99e7b7a commit d97fa28

File tree

9 files changed

+602
-4
lines changed

9 files changed

+602
-4
lines changed

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,18 @@ public class RDBStoreIterator
3232
implements TableIterator<byte[], ByteArrayKeyValue> {
3333

3434
private RocksIterator rocksDBIterator;
35+
private RDBTable rocksDBTable;
3536

3637
public RDBStoreIterator(RocksIterator iterator) {
3738
this.rocksDBIterator = iterator;
3839
rocksDBIterator.seekToFirst();
3940
}
4041

42+
public RDBStoreIterator(RocksIterator iterator, RDBTable table) {
43+
this(iterator);
44+
this.rocksDBTable = table;
45+
}
46+
4147
@Override
4248
public void forEachRemaining(
4349
Consumer<? super ByteArrayKeyValue> action) {
@@ -100,6 +106,16 @@ public ByteArrayKeyValue value() {
100106
return null;
101107
}
102108

109+
@Override
110+
public void removeFromDB() throws IOException {
111+
if (rocksDBTable == null) {
112+
throw new UnsupportedOperationException("remove");
113+
}
114+
if (rocksDBIterator.isValid()) {
115+
rocksDBTable.delete(rocksDBIterator.key());
116+
}
117+
}
118+
103119
@Override
104120
public void close() throws IOException {
105121
rocksDBIterator.close();

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public void deleteWithBatch(BatchOperation batch, byte[] key)
206206
public TableIterator<byte[], ByteArrayKeyValue> iterator() {
207207
ReadOptions readOptions = new ReadOptions();
208208
readOptions.setFillCache(false);
209-
return new RDBStoreIterator(db.newIterator(handle, readOptions));
209+
return new RDBStoreIterator(db.newIterator(handle, readOptions), this);
210210
}
211211

212212
@Override

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,12 @@ public interface TableIterator<KEY, T> extends Iterator<T>, Closeable {
6060
*/
6161
T value();
6262

63+
/**
64+
* Remove the actual value of the iterator from the database table on
65+
* which the iterator is working on.
66+
*
67+
* @throws IOException when there is an error occured during deletion.
68+
*/
69+
void removeFromDB() throws IOException;
70+
6371
}

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,5 +420,10 @@ public TypedKeyValue next() {
420420
return new TypedKeyValue(rawIterator.next(), keyType,
421421
valueType);
422422
}
423+
424+
@Override
425+
public void removeFromDB() throws IOException {
426+
rawIterator.removeFromDB();
427+
}
423428
}
424429
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
package org.apache.hadoop.hdds.utils.db;
20+
21+
import org.junit.Before;
22+
import org.junit.Test;
23+
import org.mockito.ArgumentCaptor;
24+
import org.mockito.InOrder;
25+
import org.rocksdb.RocksIterator;
26+
27+
import java.util.NoSuchElementException;
28+
import java.util.function.Consumer;
29+
30+
import static org.junit.Assert.assertArrayEquals;
31+
import static org.junit.Assert.assertFalse;
32+
import static org.junit.Assert.assertTrue;
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.Mockito.inOrder;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.never;
37+
import static org.mockito.Mockito.times;
38+
import static org.mockito.Mockito.verify;
39+
import static org.mockito.Mockito.when;
40+
41+
/**
42+
* This test prescribe expected behaviour from the RDBStoreIterator which wraps
43+
* RocksDB's own iterator. Ozone internally in TypedTableIterator uses, the
44+
* RDBStoreIterator to provide iteration over table elements in a typed manner.
45+
* The tests are to ensure we access RocksDB via the iterator properly.
46+
*/
47+
public class TestRDBStoreIterator {
48+
49+
private RocksIterator rocksDBIteratorMock;
50+
private RDBTable rocksTableMock;
51+
52+
@Before
53+
public void setup() {
54+
rocksDBIteratorMock = mock(RocksIterator.class);
55+
rocksTableMock = mock(RDBTable.class);
56+
}
57+
58+
@Test
59+
public void testForeachRemainingCallsConsumerWithAllElements() {
60+
when(rocksDBIteratorMock.isValid())
61+
.thenReturn(true, true, true, true, true, true, false);
62+
when(rocksDBIteratorMock.key())
63+
.thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02})
64+
.thenThrow(new NoSuchElementException());
65+
when(rocksDBIteratorMock.value())
66+
.thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d})
67+
.thenThrow(new NoSuchElementException());
68+
69+
70+
Consumer<ByteArrayKeyValue> consumerStub = mock(Consumer.class);
71+
72+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
73+
iter.forEachRemaining(consumerStub);
74+
75+
ArgumentCaptor<ByteArrayKeyValue> capture =
76+
ArgumentCaptor.forClass(ByteArrayKeyValue.class);
77+
verify(consumerStub, times(3)).accept(capture.capture());
78+
assertArrayEquals(
79+
new byte[]{0x00}, capture.getAllValues().get(0).getKey());
80+
assertArrayEquals(
81+
new byte[]{0x7f}, capture.getAllValues().get(0).getValue());
82+
assertArrayEquals(
83+
new byte[]{0x01}, capture.getAllValues().get(1).getKey());
84+
assertArrayEquals(
85+
new byte[]{0x7e}, capture.getAllValues().get(1).getValue());
86+
assertArrayEquals(
87+
new byte[]{0x02}, capture.getAllValues().get(2).getKey());
88+
assertArrayEquals(
89+
new byte[]{0x7d}, capture.getAllValues().get(2).getValue());
90+
}
91+
92+
@Test
93+
public void testHasNextDependsOnIsvalid(){
94+
when(rocksDBIteratorMock.isValid()).thenReturn(true, false);
95+
96+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
97+
98+
assertTrue(iter.hasNext());
99+
assertFalse(iter.hasNext());
100+
}
101+
102+
@Test
103+
public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() {
104+
when(rocksDBIteratorMock.isValid()).thenReturn(true);
105+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
106+
107+
InOrder verifier = inOrder(rocksDBIteratorMock);
108+
109+
iter.next();
110+
111+
verifier.verify(rocksDBIteratorMock).isValid();
112+
verifier.verify(rocksDBIteratorMock).key();
113+
verifier.verify(rocksDBIteratorMock).value();
114+
verifier.verify(rocksDBIteratorMock).next();
115+
}
116+
117+
@Test
118+
public void testConstructorSeeksToFirstElement() {
119+
new RDBStoreIterator(rocksDBIteratorMock);
120+
121+
verify(rocksDBIteratorMock, times(1)).seekToFirst();
122+
}
123+
124+
@Test
125+
public void testSeekToFirstSeeks() {
126+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
127+
128+
iter.seekToFirst();
129+
130+
verify(rocksDBIteratorMock, times(2)).seekToFirst();
131+
}
132+
133+
@Test
134+
public void testSeekToLastSeeks() {
135+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
136+
137+
iter.seekToLast();
138+
139+
verify(rocksDBIteratorMock, times(1)).seekToLast();
140+
}
141+
142+
@Test
143+
public void testSeekReturnsTheActualKey() {
144+
when(rocksDBIteratorMock.isValid()).thenReturn(true);
145+
when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
146+
when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
147+
148+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
149+
ByteArrayKeyValue val = iter.seek(new byte[]{0x55});
150+
151+
InOrder verifier = inOrder(rocksDBIteratorMock);
152+
153+
verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time
154+
verify(rocksDBIteratorMock, never()).seekToLast();
155+
verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class));
156+
verifier.verify(rocksDBIteratorMock, times(1)).isValid();
157+
verifier.verify(rocksDBIteratorMock, times(1)).key();
158+
verifier.verify(rocksDBIteratorMock, times(1)).value();
159+
assertArrayEquals(new byte[]{0x00}, val.getKey());
160+
assertArrayEquals(new byte[]{0x7f}, val.getValue());
161+
}
162+
163+
@Test
164+
public void testGettingTheKeyIfIteratorIsValid() {
165+
when(rocksDBIteratorMock.isValid()).thenReturn(true);
166+
when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
167+
168+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
169+
byte[] key = iter.key();
170+
171+
InOrder verifier = inOrder(rocksDBIteratorMock);
172+
173+
verifier.verify(rocksDBIteratorMock, times(1)).isValid();
174+
verifier.verify(rocksDBIteratorMock, times(1)).key();
175+
assertArrayEquals(new byte[]{0x00}, key);
176+
}
177+
178+
@Test
179+
public void testGettingTheValueIfIteratorIsValid() {
180+
when(rocksDBIteratorMock.isValid()).thenReturn(true);
181+
when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
182+
when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
183+
184+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
185+
ByteArrayKeyValue val = iter.value();
186+
187+
InOrder verifier = inOrder(rocksDBIteratorMock);
188+
189+
verifier.verify(rocksDBIteratorMock, times(1)).isValid();
190+
verifier.verify(rocksDBIteratorMock, times(1)).key();
191+
assertArrayEquals(new byte[]{0x00}, val.getKey());
192+
assertArrayEquals(new byte[]{0x7f}, val.getValue());
193+
}
194+
195+
@Test
196+
public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
197+
byte[] testKey = new byte[]{0x00};
198+
when(rocksDBIteratorMock.isValid()).thenReturn(true);
199+
when(rocksDBIteratorMock.key()).thenReturn(testKey);
200+
201+
RDBStoreIterator iter =
202+
new RDBStoreIterator(rocksDBIteratorMock, rocksTableMock);
203+
iter.removeFromDB();
204+
205+
InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock);
206+
207+
verifier.verify(rocksDBIteratorMock, times(1)).isValid();
208+
verifier.verify(rocksTableMock, times(1)).delete(testKey);
209+
}
210+
211+
@Test(expected = UnsupportedOperationException.class)
212+
public void testRemoveFromDBWithoutDBTableSet() throws Exception {
213+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
214+
iter.removeFromDB();
215+
}
216+
217+
@Test
218+
public void testCloseCloses() throws Exception {
219+
RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
220+
iter.close();
221+
222+
verify(rocksDBIteratorMock, times(1)).close();
223+
}
224+
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
package org.apache.hadoop.hdds.scm.metadata;
2020

2121
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.Arrays;
24+
import java.util.UUID;
2225

2326
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
2427
import org.apache.hadoop.hdds.utils.db.Codec;
@@ -30,12 +33,43 @@ public class PipelineIDCodec implements Codec<PipelineID> {
3033

3134
@Override
3235
public byte[] toPersistedFormat(PipelineID object) throws IOException {
33-
return object.getProtobuf().toByteArray();
36+
byte[] bytes = new byte[16];
37+
System.arraycopy(
38+
asByteArray(object.getId().getMostSignificantBits()), 0, bytes, 0, 8);
39+
System.arraycopy(
40+
asByteArray(object.getId().getLeastSignificantBits()), 0, bytes, 8, 8);
41+
return bytes;
42+
}
43+
44+
private byte[] asByteArray(long bits) {
45+
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
46+
buffer.putLong(bits);
47+
return buffer.array();
3448
}
3549

3650
@Override
3751
public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
38-
return null;
52+
long mostSiginificantBits = toLong(rawData, 0);
53+
long leastSignificantBits = toLong(rawData, 8);
54+
55+
UUID id = new UUID(mostSiginificantBits, leastSignificantBits);
56+
return PipelineID.valueOf(id);
57+
}
58+
59+
private long toLong(byte[] arr, int startIdx) throws IOException {
60+
if (arr.length < startIdx + 8) {
61+
throw new IOException("Key conversion error.",
62+
new ArrayIndexOutOfBoundsException(
63+
"Key does not have the least expected amount of bytes,"
64+
+ "and does not contain a UUID. Key: "
65+
+ Arrays.toString(arr)
66+
)
67+
);
68+
}
69+
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
70+
buffer.put(arr, startIdx, 8);
71+
buffer.flip();
72+
return buffer.getLong();
3973
}
4074

4175
@Override

0 commit comments

Comments
 (0)