Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,13 @@ <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
* @return codec registry.
*/
CodecRegistry getCodecRegistry();

/**
* Get data written to DB since a specific sequence number.
* @param sequenceNumber
* @return
* @throws SequenceNumberNotFoundException
*/
DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hadoop.utils.db;

import java.util.ArrayList;
import java.util.List;

/**
* Wrapper class to hold DB data read from the RocksDB log file.
*/
public class DBUpdatesWrapper {

private List<byte[]> dataList = new ArrayList<>();
private long currentSequenceNumber = -1;

public void addWriteBatch(byte[] data, long sequenceNumber) {
dataList.add(data);
if (currentSequenceNumber < sequenceNumber) {
currentSequenceNumber = sequenceNumber;
}
}

public List<byte[]> getData() {
return dataList;
}

public long getCurrentSequenceNumber() {
return currentSequenceNumber;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.rocksdb.FlushOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.TransactionLogIterator;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -327,6 +328,51 @@ public CodecRegistry getCodecRegistry() {
return codecRegistry;
}

@Override
public DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
throws SequenceNumberNotFoundException {

DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
try {
TransactionLogIterator transactionLogIterator =
db.getUpdatesSince(sequenceNumber);

// Only the first record needs to be checked if its seq number <
// ( 1 + passed_in_sequence_number). For example, if seqNumber passed
// in is 100, then we can read from the WAL ONLY if the first sequence
// number is <= 101. If it is 102, then 101 may already be flushed to
// SST. If it 99, we can skip 99 and 100, and then read from 101.

boolean checkValidStartingSeqNumber = true;

while (transactionLogIterator.isValid()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this imply flush to sst can happen while iterating the log?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, flushes can happen from memtables to SST anytime. If the WAL is deleted while it is being read, we will still be handle it through a retry mechanism from Recon side.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means we need to at least configure the flushes defensively to maybe 3 * recon sync internal. Will that be a separate Jira or is already correctly configured?

TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
long currSequenceNumber = result.sequenceNumber();
if (checkValidStartingSeqNumber &&
currSequenceNumber > 1 + sequenceNumber) {
throw new SequenceNumberNotFoundException("Unable to read data from" +
" RocksDB wal to get delta updates. It may have already been" +
"flushed to SSTs.");
}
// If the above condition was not satisfied, then it is OK to reset
// the flag.
checkValidStartingSeqNumber = false;
if (currSequenceNumber <= sequenceNumber) {
transactionLogIterator.next();
continue;
}
dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(),
result.sequenceNumber());
transactionLogIterator.next();
}
} catch (RocksDBException e) {
LOG.error("Unable to get delta updates since sequenceNumber {} ",
sequenceNumber, e);
}
return dbUpdatesWrapper;
}

@VisibleForTesting
public RocksDB getDb() {
return db;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hadoop.utils.db;

import java.io.IOException;

/**
* Thrown if RocksDB is unable to find requested data from WAL file.
*/
public class SequenceNumberNotFoundException extends IOException {

public SequenceNumberNotFoundException() {
super();
}

public SequenceNumberNotFoundException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,25 @@ public void testRocksDBKeyMayExistApi() throws Exception {
}
}

@Test
public void testGetDBUpdatesSince() throws Exception {

try (RDBStore newStore =
new RDBStore(folder.newFolder(), options, configSet)) {

try (Table firstTable = newStore.getTable(families.get(1))) {
firstTable.put(StringUtils.getBytesUtf16("Key1"), StringUtils
.getBytesUtf16("Value1"));
firstTable.put(StringUtils.getBytesUtf16("Key2"), StringUtils
.getBytesUtf16("Value2"));
}
Assert.assertTrue(
newStore.getDb().getLatestSequenceNumber() == 2);

DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0);
Assert.assertEquals(2, dbUpdatesSince.getData().size());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public static boolean isReadOnly(
case LookupFile:
case ListStatus:
case GetAcl:
case DBUpdates:
return true;
case CreateVolume:
case SetVolumeProperty:
Expand Down
12 changes: 12 additions & 0 deletions hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum Type {
ListMultiPartUploadParts = 50;

ServiceList = 51;
DBUpdates = 53;

GetDelegationToken = 61;
RenewDelegationToken = 62;
Expand Down Expand Up @@ -136,6 +137,7 @@ message OMRequest {
optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50;

optional ServiceListRequest serviceListRequest = 51;
optional DBUpdatesRequest dbUpdatesRequest = 53;

optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61;
optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62;
Expand Down Expand Up @@ -202,6 +204,7 @@ message OMResponse {
optional MultipartUploadListPartsResponse listMultipartUploadPartsResponse = 50;

optional ServiceListResponse ServiceListResponse = 51;
optional DBUpdatesResponse dbUpdatesResponse = 52;

optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61;
optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62;
Expand Down Expand Up @@ -836,11 +839,20 @@ message AllocateBlockResponse {
message ServiceListRequest {
}

message DBUpdatesRequest {
required uint64 sequenceNumber = 1;
}

message ServiceListResponse {

repeated ServiceInfo serviceInfo = 2;
}

message DBUpdatesResponse {
required uint64 sequenceNumber = 1;
repeated bytes data = 2;
}

message ServicePort {
enum Type {
RPC = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
Expand All @@ -81,6 +82,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.RDBStore;
import org.apache.hadoop.utils.db.Table;
import org.apache.hadoop.utils.db.Table.KeyValue;
Expand Down Expand Up @@ -1395,8 +1397,41 @@ public void testDBKeyMayExist() throws Exception {
RDBStore rdbStore = (RDBStore) cluster.getOzoneManager()
.getMetadataManager().getStore();
RocksDB db = rdbStore.getDb();
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();

OmKeyInfo keyInfo = getNewOmKeyInfo();
OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec();

db.put(StringUtils.getBytesUtf16("OMKey1"),
omKeyInfoCodec.toPersistedFormat(keyInfo));

StringBuilder sb = new StringBuilder();
Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"),
sb));
Assert.assertTrue(sb.length() > 0);
}


@Test
public void testGetOMDBUpdates() throws IOException {

DBUpdatesRequest dbUpdatesRequest =
DBUpdatesRequest.newBuilder().setSequenceNumber(0).build();

DBUpdatesWrapper dbUpdates =
cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest);
Assert.assertTrue(dbUpdates.getData().isEmpty());

//Write data to OM.
OmKeyInfo keyInfo = getNewOmKeyInfo();
Assert.assertNotNull(keyInfo);
dbUpdates =
cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest);
Assert.assertFalse(dbUpdates.getData().isEmpty());

}

private OmKeyInfo getNewOmKeyInfo() throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
.setVolume("vol1")
.setAdminName("bilbo")
Expand All @@ -1423,16 +1458,6 @@ public void testDBKeyMayExist() throws Exception {
.build();
OpenKeySession keySession = cluster.getOzoneManager().getKeyManager()
.openKey(keyArgs);
OmKeyInfo keyInfo = keySession.getKeyInfo();
OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec();

db.put(StringUtils.getBytesUtf16("OMKey1"),
omKeyInfoCodec.toPersistedFormat(keyInfo));

StringBuilder sb = new StringBuilder();
Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"),
sb));
Assert.assertTrue(sb.length() > 0);
return keySession.getKeyInfo();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol;
import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.security.OzoneSecurityException;
Expand Down Expand Up @@ -144,6 +145,8 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.utils.RetriableTask;
import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
import org.apache.hadoop.utils.db.DBCheckpoint;
import org.apache.hadoop.utils.db.DBStore;
import org.apache.ratis.server.protocol.TermIndex;
Expand Down Expand Up @@ -3346,4 +3349,18 @@ public boolean isLeader() {
public boolean isRatisEnabled() {
return isRatisEnabled;
}

/**
* Get DB updates since a specific sequence number.
* @param dbUpdatesRequest request that encapsulates a sequence number.
* @return Wrapper containing the updates.
* @throws SequenceNumberNotFoundException if db is unable to read the data.
*/
public DBUpdatesWrapper getDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {
return metadataManager.getStore()
.getUpdatesSince(dbUpdatesRequest.getSequenceNumber());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@
import org.apache.hadoop.security.token.Token;

import com.google.common.collect.Lists;

import org.apache.hadoop.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.utils.db.SequenceNumberNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -298,6 +301,11 @@ public OMResponse handle(OMRequest request) {
request.getServiceListRequest());
responseBuilder.setServiceListResponse(serviceListResponse);
break;
case DBUpdates:
DBUpdatesResponse dbUpdatesResponse = getOMDBUpdates(
request.getDbUpdatesRequest());
responseBuilder.setDbUpdatesResponse(dbUpdatesResponse);
break;
case GetDelegationToken:
GetDelegationTokenResponseProto getDtResp = getDelegationToken(
request.getGetDelegationTokenRequest());
Expand Down Expand Up @@ -377,6 +385,21 @@ public OMResponse handle(OMRequest request) {
return responseBuilder.build();
}

private DBUpdatesResponse getOMDBUpdates(
DBUpdatesRequest dbUpdatesRequest)
throws SequenceNumberNotFoundException {

DBUpdatesResponse.Builder builder = DBUpdatesResponse
.newBuilder();
DBUpdatesWrapper dbUpdatesWrapper =
impl.getDBUpdates(dbUpdatesRequest);
for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) {
builder.setData(i,
OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i)));
}
return builder.build();
}

private GetAclResponse getAcl(GetAclRequest req) throws IOException {
List<OzoneAclInfo> acls = new ArrayList<>();

Expand Down