Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ public String getDeletingBlockKeyPrefix() {

public KeyPrefixFilter getUnprefixedKeyFilter() {
String schemaPrefix = containerPrefix();
return new KeyPrefixFilter().addFilter(schemaPrefix + "#", true);
return KeyPrefixFilter.newFilter(schemaPrefix + "#", true);
}

public KeyPrefixFilter getDeletingBlockKeyFilter() {
return new KeyPrefixFilter().addFilter(getDeletingBlockKeyPrefix());
return KeyPrefixFilter.newFilter(getDeletingBlockKeyPrefix());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ private static void populateContainerMetadata(
LOG.warn("Missing pendingDeleteBlockCount from {}: recalculate them from block table", metadataTable.getName());
MetadataKeyFilters.KeyPrefixFilter filter =
kvContainerData.getDeletingBlockKeyFilter();
blockPendingDeletion = store.getBlockDataTable()
.getSequentialRangeKVs(kvContainerData.startKeyEmpty(),
Integer.MAX_VALUE, kvContainerData.containerPrefix(),
filter).size();
blockPendingDeletion = store.getBlockDataTable().getRangeKVs(
kvContainerData.startKeyEmpty(), Integer.MAX_VALUE, kvContainerData.containerPrefix(), filter, true)
// TODO: add a count() method to avoid creating a list
.size();
}
// Set delete transaction id.
Long delTxnId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,8 @@ public List<BlockData> listBlock(Container container, long startLocalID, int
result = new ArrayList<>();
String startKey = (startLocalID == -1) ? cData.startKeyEmpty()
: cData.getBlockKey(startLocalID);
List<Table.KeyValue<String, BlockData>> range =
db.getStore().getBlockDataTable()
.getSequentialRangeKVs(startKey, count,
cData.containerPrefix(), cData.getUnprefixedKeyFilter());
final List<Table.KeyValue<String, BlockData>> range = db.getStore().getBlockDataTable().getRangeKVs(
startKey, count, cData.containerPrefix(), cData.getUnprefixedKeyFilter(), true);
for (Table.KeyValue<String, BlockData> entry : range) {
result.add(db.getStore().getCompleteBlockData(entry.getValue(), null, entry.getKey()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,8 @@ public ContainerBackgroundTaskResult deleteViaSchema1(

// # of blocks to delete is throttled
KeyPrefixFilter filter = containerData.getDeletingBlockKeyFilter();
List<Table.KeyValue<String, BlockData>> toDeleteBlocks = blockDataTable
.getSequentialRangeKVs(containerData.startKeyEmpty(),
(int) blocksToDelete, containerData.containerPrefix(),
filter);
final List<Table.KeyValue<String, BlockData>> toDeleteBlocks = blockDataTable.getRangeKVs(
containerData.startKeyEmpty(), (int) blocksToDelete, containerData.containerPrefix(), filter, true);
if (toDeleteBlocks.isEmpty()) {
LOG.debug("No under deletion block found in container : {}",
containerData.getContainerID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,8 @@ protected static void checkTableStatus(Table<?, ?> table, String name)

/**
* Block Iterator for KeyValue Container. This block iterator returns blocks
* which match with the {@link org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter}. If no
* filter is specified, then default filter used is
* {@link org.apache.hadoop.hdds.utils.MetadataKeyFilters#getUnprefixedKeyFilter()}
* which match with the {@link KeyPrefixFilter}.
* The default filter is {@link #DEFAULT_BLOCK_FILTER}.
*/
@InterfaceAudience.Public
public static class KeyValueBlockIterator implements
Expand Down Expand Up @@ -263,7 +262,7 @@ public boolean hasNext() throws IOException {
while (blockIterator.hasNext()) {
Table.KeyValue<String, BlockData> keyValue = blockIterator.next();
byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey());
if (blockFilter.filterKey(null, keyBytes, null)) {
if (blockFilter.filterKey(keyBytes)) {
nextBlock = keyValue.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("Block matching with filter found: blockID is : {} for " +
Expand Down Expand Up @@ -296,9 +295,7 @@ public void close() throws IOException {
/**
* Block localId Iterator for KeyValue Container.
* This Block localId iterator returns localIds
* which match with the {@link org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter}. If no
* filter is specified, then default filter used is
* {@link org.apache.hadoop.hdds.utils.MetadataKeyFilters#getUnprefixedKeyFilter()}
* which match with the {@link KeyPrefixFilter}.
*/
@InterfaceAudience.Public
public static class KeyValueBlockLocalIdIterator implements
Expand Down Expand Up @@ -351,7 +348,7 @@ public boolean hasNext() throws IOException {
while (blockLocalIdIterator.hasNext()) {
Table.KeyValue<String, Long> keyValue = blockLocalIdIterator.next();
byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey());
if (localIdFilter.filterKey(null, keyBytes, null)) {
if (localIdFilter.filterKey(keyBytes)) {
nextLocalId = keyValue.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("Block matching with filter found: LocalID is : " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutFeature;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
Expand Down Expand Up @@ -89,21 +89,20 @@ public BlockIterator<BlockData> getBlockIterator(long containerID)
return new KeyValueBlockIterator(containerID,
getBlockDataTableWithIterator()
.iterator(getContainerKeyPrefix(containerID)),
new MetadataKeyFilters.KeyPrefixFilter().addFilter(
getContainerKeyPrefix(containerID) + "#", true));
KeyPrefixFilter.newFilter(getContainerKeyPrefix(containerID) + "#", true));
}

@Override
public BlockIterator<BlockData> getBlockIterator(long containerID,
MetadataKeyFilters.KeyPrefixFilter filter) throws IOException {
public BlockIterator<BlockData> getBlockIterator(long containerID, KeyPrefixFilter filter)
throws IOException {
return new KeyValueBlockIterator(containerID,
getBlockDataTableWithIterator()
.iterator(getContainerKeyPrefix(containerID)), filter);
}

@Override
public BlockIterator<Long> getFinalizeBlockIterator(long containerID,
MetadataKeyFilters.KeyPrefixFilter filter) throws IOException {
public BlockIterator<Long> getFinalizeBlockIterator(long containerID, KeyPrefixFilter filter)
throws IOException {
return new KeyValueBlockLocalIdIterator(containerID,
getFinalizeBlocksTableWithIterator().iterator(getContainerKeyPrefix(containerID)), filter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.io.File;
import java.util.List;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
Expand Down Expand Up @@ -111,18 +111,9 @@ public VALUE getReadCopy(KEY key) throws RocksDatabaseException, CodecException

@Override
public List<KeyValue<KEY, VALUE>> getRangeKVs(
KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws RocksDatabaseException, CodecException {
return table.getRangeKVs(startKey, count, prefix, filters);
}

@Override
public List<KeyValue<KEY, VALUE>> getSequentialRangeKVs(
KEY startKey, int count, KEY prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws RocksDatabaseException, CodecException {
return table.getSequentialRangeKVs(startKey, count, prefix, filters);
KEY startKey, int count, KEY prefix, KeyPrefixFilter filter, boolean isSequential)
throws RocksDatabaseException, CodecException {
return table.getRangeKVs(startKey, count, prefix, filter, isSequential);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
Expand All @@ -46,6 +46,7 @@
public class SchemaOneDeletedBlocksTable extends DatanodeTable<String,
ChunkInfoList> {
public static final String DELETED_KEY_PREFIX = "#deleted#";
private static final KeyPrefixFilter DELETED_FILTER = KeyPrefixFilter.newFilter(DELETED_KEY_PREFIX);

public SchemaOneDeletedBlocksTable(Table<String, ChunkInfoList> table) {
super(table);
Expand Down Expand Up @@ -99,28 +100,12 @@ public ChunkInfoList getReadCopy(String key) throws RocksDatabaseException, Code

@Override
public List<KeyValue<String, ChunkInfoList>> getRangeKVs(
String startKey, int count, String prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws RocksDatabaseException, CodecException {

// Deleted blocks will always have the #deleted# key prefix and nothing
// else in this schema version. Ignore any user passed prefixes that could
// collide with this and return results that are not deleted blocks.
return unprefix(super.getRangeKVs(prefix(startKey), count,
prefix, getDeletedFilter()));
}

@Override
public List<KeyValue<String, ChunkInfoList>> getSequentialRangeKVs(
String startKey, int count, String prefix,
MetadataKeyFilters.MetadataKeyFilter... filters)
throws RocksDatabaseException, CodecException {

String startKey, int count, String prefix, KeyPrefixFilter filter, boolean isSequential)
throws RocksDatabaseException, CodecException {
// Deleted blocks will always have the #deleted# key prefix and nothing
// else in this schema version. Ignore any user passed prefixes that could
// collide with this and return results that are not deleted blocks.
return unprefix(super.getSequentialRangeKVs(prefix(startKey), count,
prefix, getDeletedFilter()));
return unprefix(super.getRangeKVs(prefix(startKey), count, prefix, DELETED_FILTER, isSequential));
}

private static String prefix(String key) {
Expand Down Expand Up @@ -148,9 +133,4 @@ private static List<KeyValue<String, ChunkInfoList>> unprefix(
.map(kv -> Table.newKeyValue(unprefix(kv.getKey()), kv.getValue()))
.collect(Collectors.toList());
}

private static MetadataKeyFilters.KeyPrefixFilter getDeletedFilter() {
return (new MetadataKeyFilters.KeyPrefixFilter())
.addFilter(DELETED_KEY_PREFIX);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -305,16 +305,14 @@ public void testKeyValueBlockIteratorWithAdvancedFilter(

// Test arbitrary filter.
String schemaPrefix = containerData.containerPrefix();
MetadataKeyFilters.KeyPrefixFilter secondFilter =
new MetadataKeyFilters.KeyPrefixFilter()
.addFilter(schemaPrefix + secondPrefix);
final KeyPrefixFilter secondFilter = KeyPrefixFilter.newFilter(schemaPrefix + secondPrefix);
testWithFilter(secondFilter, blockIDs.get(secondPrefix));
}

/**
* Helper method to run some iterator tests with a provided filter.
*/
private void testWithFilter(MetadataKeyFilters.KeyPrefixFilter filter,
private void testWithFilter(KeyPrefixFilter filter,
List<Long> expectedIDs) throws Exception {
try (BlockIterator<BlockData> iterator =
db.getStore().getBlockIterator(CONTAINER_ID, filter)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ public interface MetadataKeyFilter {
/**
* Filter levelDB key with a certain condition.
*
* @param preKey previous key.
* @param currentKey current key.
* @param nextKey next key.
* @return true if a certain condition satisfied, return false otherwise.
*/
boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey);
boolean filterKey(byte[] currentKey);

default int getKeysScannedNum() {
return 0;
Expand All @@ -74,28 +72,13 @@ default int getKeysHintedNum() {
* Utility class to filter key by a string prefix. This filter
* assumes keys can be parsed to a string.
*/
public static class KeyPrefixFilter implements MetadataKeyFilter {

public static final class KeyPrefixFilter implements MetadataKeyFilter {
private List<String> positivePrefixList = new ArrayList<>();
private List<String> negativePrefixList = new ArrayList<>();
private boolean atleastOnePositiveMatch;
private int keysScanned = 0;
private int keysHinted = 0;

public KeyPrefixFilter() { }

/**
* KeyPrefixFilter constructor. It is made of positive and negative prefix
* list. PositivePrefixList is the list of prefixes which are accepted
* whereas negativePrefixList contains the list of prefixes which are
* rejected.
*
* @param atleastOnePositiveMatch if positive it requires key to be accepted
* by atleast one positive filter.
*/
public KeyPrefixFilter(boolean atleastOnePositiveMatch) {
this.atleastOnePositiveMatch = atleastOnePositiveMatch;
}
private KeyPrefixFilter() { }

public KeyPrefixFilter addFilter(String keyPrefix) {
addFilter(keyPrefix, false);
Expand Down Expand Up @@ -129,8 +112,7 @@ public KeyPrefixFilter addFilter(String keyPrefix, boolean negative) {
}

@Override
public boolean filterKey(byte[] preKey, byte[] currentKey,
byte[] nextKey) {
public boolean filterKey(byte[] currentKey) {
keysScanned++;
if (currentKey == null) {
return false;
Expand All @@ -150,8 +132,6 @@ public boolean filterKey(byte[] preKey, byte[] currentKey,
if (accept) {
keysHinted++;
return true;
} else if (atleastOnePositiveMatch) {
return false;
}

accept = !negativePrefixList.isEmpty() && negativePrefixList.stream()
Expand Down Expand Up @@ -190,5 +170,13 @@ private static boolean prefixMatch(byte[] prefix, byte[] key) {
}
return true;
}

public static KeyPrefixFilter newFilter(String prefix) {
return newFilter(prefix, false);
}

public static KeyPrefixFilter newFilter(String prefix, boolean negative) {
return new KeyPrefixFilter().addFilter(prefix, negative);
}
}
}
Loading