diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/BooleanCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/BooleanCodec.java index 4ce44e41acee..0aa894a0bdef 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/BooleanCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/BooleanCodec.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdds.utils.db; import jakarta.annotation.Nonnull; +import java.util.Comparator; +import java.util.Objects; /** * Codec to serialize/deserialize {@link Boolean}. @@ -27,6 +29,7 @@ public final class BooleanCodec implements Codec { private static final byte TRUE = 1; private static final byte FALSE = 0; private static final BooleanCodec INSTANCE = new BooleanCodec(); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Boolean::compare); public static BooleanCodec get() { return INSTANCE; @@ -75,4 +78,9 @@ public Boolean fromPersistedFormat(byte[] rawData) { public Boolean copyObject(Boolean object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java index be75ea32d0b5..36d21eae68f6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/Codec.java @@ -19,6 +19,9 @@ import jakarta.annotation.Nonnull; import java.io.IOException; +import java.io.Serializable; +import java.util.Comparator; +import java.util.Objects; /** * Codec interface to serialize/deserialize objects to/from bytes. @@ -29,6 +32,7 @@ */ public interface Codec { byte[] EMPTY_BYTE_ARRAY = {}; + ByteArrayComparator COMPARATOR = new ByteArrayComparator(); /** @return the class of the {@link T}. */ Class getTypeClass(); @@ -112,4 +116,36 @@ default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException { * the returned object can possibly be the same as the given object. */ T copyObject(T object); + + /** + * @return Comparator for the given type of object. + */ + default Comparator comparator() { + return (o1, o2) -> { + try { + byte[] a1 = o1 == null ? null : toPersistedFormat(o1); + byte[] a2 = o2 == null ? null : toPersistedFormat(o2); + return Objects.compare(a1, a2, COMPARATOR); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + /** + * Comparator to compare 2 byte arrays based on their relative lexicographical ordering. + */ + class ByteArrayComparator implements Comparator, Serializable { + @Override + public int compare(byte[] o1, byte[] o2) { + int len = Math.min(o1.length, o2.length); + for (int i = 0; i < len; i++) { + int cmp = Byte.toUnsignedInt(o1[i]) - Byte.toUnsignedInt(o2[i]); + if (cmp != 0) { + return cmp; + } + } + return Integer.compare(o1.length, o2.length); + } + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/DelegatedCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/DelegatedCodec.java index c0c6cb4a6972..75e06b321ef6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/DelegatedCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/DelegatedCodec.java @@ -19,6 +19,9 @@ import jakarta.annotation.Nonnull; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.Objects; import org.apache.ratis.util.function.CheckedFunction; /** @@ -128,4 +131,18 @@ public T copyObject(T message) { throw new IllegalStateException("Failed to copyObject", e); } } + + @Override + public Comparator comparator() { + return (o1, o2) -> { + try { + DELEGATE d1 = o1 == null ? null : backward.apply(o1); + DELEGATE d2 = o2 == null ? null : backward.apply(o2); + return Objects.compare(d1, d2, delegate.comparator()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + }; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java index bcd8875df871..6d13aeb3696e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/IntegerCodec.java @@ -19,6 +19,8 @@ import jakarta.annotation.Nonnull; import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.Objects; /** * Codec to serialize/deserialize {@link Integer}. @@ -26,6 +28,7 @@ public final class IntegerCodec implements Codec { private static final IntegerCodec INSTANCE = new IntegerCodec(); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Integer::compare); public static IntegerCodec get() { return INSTANCE; @@ -70,4 +73,9 @@ public Integer fromPersistedFormat(byte[] rawData) { public Integer copyObject(Integer object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java index 3d4044bfb40f..d991351973d5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java @@ -19,12 +19,15 @@ import jakarta.annotation.Nonnull; import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.Objects; /** * Codec to serialize/deserialize {@link Long}. */ public final class LongCodec implements Codec { private static final LongCodec CODEC = new LongCodec(); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Long::compare); public static LongCodec get() { return CODEC; @@ -74,4 +77,9 @@ public Long fromPersistedFormat(byte[] rawData) { public Long copyObject(Long object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/ShortCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/ShortCodec.java index ed46cb5c5510..4b8c153a8188 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/ShortCodec.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/ShortCodec.java @@ -19,6 +19,8 @@ import jakarta.annotation.Nonnull; import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.Objects; /** * Codec to serialize/deserialize {@link Short}. @@ -26,6 +28,7 @@ public final class ShortCodec implements Codec { private static final ShortCodec INSTANCE = new ShortCodec(); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Short::compare); public static ShortCodec get() { return INSTANCE; @@ -71,4 +74,9 @@ public Short fromPersistedFormat(byte[] rawData) { public Short copyObject(Short object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java index 675ac4fcfecb..da9a1c648820 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/StringCodecBase.java @@ -26,6 +26,7 @@ import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.CodingErrorAction; +import java.util.Comparator; import java.util.Objects; import java.util.function.Function; import org.apache.hadoop.hdds.StringUtils; @@ -39,6 +40,7 @@ */ abstract class StringCodecBase implements Codec { static final Logger LOG = LoggerFactory.getLogger(StringCodecBase.class); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, String::compareTo); private final Charset charset; private final boolean fixedLength; @@ -197,4 +199,9 @@ public String fromPersistedFormat(byte[] bytes) { public String copyObject(String object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 5dc4e15fc0d6..f154a510a219 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -197,6 +197,10 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT = "ozone.client.ec.grpc.write.timeout"; public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT = "30s"; + public static final String OZONE_PARALLEL_DB_ITERATORS_CORE_POOL_SIZE = "ozone.db.parallel.iterators.core.pool.size"; + public static final int OZONE_PARALLEL_DB_ITERATORS_CORE_POOL_SIZE_DEFAULT = 0; + public static final String OZONE_PARALLEL_DB_ITERATORS_MAX_POOL_SIZE = "ozone.db.parallel.iterators.max.pool.size"; + public static final int OZONE_PARALLEL_DB_ITERATORS_MAX_POOL_SIZE_DEFAULT = 10; /** diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index e2a7b293556f..e2166a105edc 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3963,6 +3963,14 @@ OM grpc server netty boss event group size. + + ozone.om.snapshot.db.parallel.iterator.pool.size + 1 + OZONE, OM + + Max iterator pool size for parallely iterating through DB. + + ozone.om.grpc.workergroup.size diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java index 94b8b0569978..41a90b0a6484 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeTable.java @@ -20,10 +20,13 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.ratis.util.function.CheckedFunction; +import org.slf4j.Logger; /** * Wrapper class to represent a table in a datanode RocksDB instance. @@ -89,6 +92,14 @@ public void deleteWithBatch(BatchOperation batch, KEY key) "version."); } + @Override + public void parallelTableOperation( + KEY startKey, KEY endKey, CheckedFunction, Void, IOException> operation, + Logger logger, int logPercentageThreshold) + throws IOException, ExecutionException, InterruptedException { + table.parallelTableOperation(startKey, endKey, operation, logger, logPercentageThreshold); + } + @Override public String getName() throws IOException { return table.getName(); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/ContainerImplTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/ContainerImplTestUtils.java index 1e27e748d69c..ac1a2ceb9f8f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/ContainerImplTestUtils.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/ContainerImplTestUtils.java @@ -19,6 +19,7 @@ import java.time.Clock; import org.apache.hadoop.hdds.utils.db.InMemoryTestTable; +import org.apache.hadoop.hdds.utils.db.LongCodec; /** * Helper utility to test container impl. @@ -33,10 +34,10 @@ public static ContainerSet newContainerSet() { } public static ContainerSet newContainerSet(long recoveringTimeout) { - return ContainerSet.newRwContainerSet(new InMemoryTestTable<>(), recoveringTimeout); + return ContainerSet.newRwContainerSet(new InMemoryTestTable<>(LongCodec.get()), recoveringTimeout); } public static ContainerSet newContainerSet(long recoveringTimeout, Clock clock) { - return new ContainerSet(new InMemoryTestTable<>(), recoveringTimeout, clock); + return new ContainerSet(new InMemoryTestTable<>(LongCodec.get()), recoveringTimeout, clock); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java new file mode 100644 index 000000000000..7ff912b3df66 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BaseRDBTable.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.List; +import org.rocksdb.LiveFileMetaData; + +/** + * Base table interface for Rocksdb. + */ +public interface BaseRDBTable extends Table { + List getTableSstFiles() throws IOException; +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java index 4d83acba39ef..e361dd94407e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java @@ -210,6 +210,11 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber) DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) throws IOException; + /** + * Gets the executor pool parallel table iterator. + */ + ThrottledThreadpoolExecutor getParallelTableIteratorPool(); + /** * Return if the underlying DB is closed. This call is thread safe. * @return true if the DB is closed. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java index c6f9e1c7cf1b..354ec728b881 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java @@ -106,6 +106,7 @@ public final class DBStoreBuilder { // number in request to avoid increase in heap memory. private long maxDbUpdatesSizeThreshold; private Integer maxNumberOfOpenFiles = null; + private int parallelTableIteratorMaxPoolSize; /** * Create DBStoreBuilder from a generic DBDefinition. @@ -156,8 +157,11 @@ private DBStoreBuilder(ConfigurationSource configuration, this.maxDbUpdatesSizeThreshold = (long) configuration.getStorageSize( OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT, OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT, StorageUnit.BYTES); + this.parallelTableIteratorMaxPoolSize = rocksDBConfiguration.getParallelIteratorMaxPoolSize(); } + + public static File getDBDirPath(DBDefinition definition, ConfigurationSource configuration) { // Set metadata dirs. @@ -201,6 +205,10 @@ private void setDBOptionsProps(ManagedDBOptions dbOptions) { } } + public void setParallelTableIteratorMaxPoolSize(int parallelTableIteratorMaxPoolSize) { + this.parallelTableIteratorMaxPoolSize = parallelTableIteratorMaxPoolSize; + } + /** * Builds a DBStore instance and returns that. * @@ -227,11 +235,13 @@ public DBStore build() throws IOException { if (!dbFile.getParentFile().exists()) { throw new IOException("The DB destination directory should exist."); } - + if (this.parallelTableIteratorMaxPoolSize == 0) { + throw new IOException("The parallel table iterator max pool size cannot be zero."); + } return new RDBStore(dbFile, rocksDBOption, statistics, writeOptions, tableConfigs, registry.build(), openReadOnly, dbJmxBeanNameName, enableCompactionDag, maxDbUpdatesSizeThreshold, createCheckpointDirs, configuration, - enableRocksDbMetrics); + enableRocksDbMetrics, this.parallelTableIteratorMaxPoolSize); } finally { tableConfigs.forEach(TableConfig::close); } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ParallelTableOperator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ParallelTableOperator.java new file mode 100644 index 000000000000..1787f27d47b8 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ParallelTableOperator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ratis.util.function.CheckedFunction; +import org.slf4j.Logger; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators for RDB store. + */ +public abstract class ParallelTableOperator, K, V> { + private final TABLE table; + private final Codec keyCodec; + private final Comparator comparator; + private final ThrottledThreadpoolExecutor executor; + + public ParallelTableOperator(ThrottledThreadpoolExecutor throttledThreadpoolExecutor, + TABLE table, Codec keyCodec) { + this.executor = throttledThreadpoolExecutor; + this.table = table; + this.keyCodec = keyCodec; + this.comparator = keyCodec.comparator(); + } + + /** + * Provide all the bounds that fall in range [startKey, endKey] in a sorted list to facilitate efficiently + * splitting table iteration of keys into multiple parallel iterators of the table. + */ + protected abstract List getBounds(K startKey, K endKey) throws IOException; + + @SuppressWarnings("parameternumber") + private CompletableFuture submit( + CheckedFunction, Void, THROWABLE> keyOperation, K beg, K end, + AtomicLong keyCounter, AtomicLong prevLogCounter, long logCountThreshold, Logger log, + AtomicBoolean cancelled) throws InterruptedException { + return executor.submit(() -> { + try (TableIterator> iter = table.iterator()) { + if (beg != null) { + iter.seek(beg); + } else { + iter.seekToFirst(); + } + while (iter.hasNext() && !cancelled.get()) { + Table.KeyValue kv = iter.next(); + if (end == null || Objects.compare(kv.getKey(), end, comparator) < 0) { + keyOperation.apply(kv); + keyCounter.incrementAndGet(); + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + log.info("Iterated through table : {} {} keys while performing task.", table.getName(), + keyCounter.get()); + prevLogCounter.set(keyCounter.get()); + } + } else { + break; + } + } + } + }); + } + + public void performTaskOnTableVals(K startKey, K endKey, + CheckedFunction, Void, THROWABLE> keyOperation, + Logger log, int logPercentageThreshold) + throws ExecutionException, InterruptedException, IOException, THROWABLE { + List bounds; + long logCountThreshold = Math.max((table.getEstimatedKeyCount() * logPercentageThreshold) / 100, 1L); + try { + bounds = getBounds(startKey, endKey); + } catch (IOException e) { + log.warn("Error while getting bounds Table: {} startKey: {}, endKey: {}", table.getName(), startKey, endKey, e); + bounds = Arrays.asList(startKey, endKey); + } + AtomicLong keyCounter = new AtomicLong(); + AtomicLong prevLogCounter = new AtomicLong(); + CompletableFuture iterFutures = CompletableFuture.completedFuture(null); + AtomicBoolean cancelled = new AtomicBoolean(false); + for (int idx = 1; idx < bounds.size(); idx++) { + K beg = bounds.get(idx - 1); + K end = bounds.get(idx); + if (cancelled.get()) { + break; + } + CompletableFuture future = submit(keyOperation, beg, end, keyCounter, prevLogCounter, + logCountThreshold, log, cancelled); + future.exceptionally((e -> { + cancelled.set(true); + return null; + })); + iterFutures = iterFutures.thenCombine(future, (v1, v2) -> null); + } + iterFutures.get(); + } + + protected TABLE getTable() { + return table; + } + + public Codec getKeyCodec() { + return keyCodec; + } + + public Comparator getComparator() { + return comparator; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBParallelTableOperator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBParallelTableOperator.java new file mode 100644 index 000000000000..cdaa7f441e20 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBParallelTableOperator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.rocksdb.LiveFileMetaData; + +/** + * Class to iterate through a table in parallel by breaking table into multiple iterators for RDB store. + */ +public class RDBParallelTableOperator extends ParallelTableOperator, K, V> { + + public RDBParallelTableOperator(ThrottledThreadpoolExecutor throttledThreadpoolExecutor, + BaseRDBTable table, Codec keyCodec) { + super(throttledThreadpoolExecutor, table, keyCodec); + } + + @Override + protected List getBounds(K startKey, K endKey) throws IOException { + Set keys = new HashSet<>(); + for (LiveFileMetaData sstFile : this.getTable().getTableSstFiles()) { + keys.add(this.getKeyCodec().fromPersistedFormat(sstFile.smallestKey())); + keys.add(this.getKeyCodec().fromPersistedFormat(sstFile.largestKey())); + } + List boundKeys = new ArrayList<>(); + boundKeys.add(startKey); + boundKeys.addAll(keys.stream().sorted().filter(Objects::nonNull) + .filter(key -> startKey == null || getComparator().compare(key, startKey) > 0) + .filter(key -> endKey == null || getComparator().compare(endKey, key) > 0) + .collect(Collectors.toList())); + boundKeys.add(endKey); + return boundKeys; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java index b44c2685caee..cf5d720fc870 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java @@ -74,6 +74,7 @@ public class RDBStore implements DBStore { private final long maxDbUpdatesSizeThreshold; private final ManagedDBOptions dbOptions; private final ManagedStatistics statistics; + private final ThrottledThreadpoolExecutor parallelTableIteratorPool; @SuppressWarnings("parameternumber") public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics statistics, @@ -83,8 +84,8 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics stati long maxDbUpdatesSizeThreshold, boolean createCheckpointDirs, ConfigurationSource configuration, - boolean enableRocksDBMetrics) - + boolean enableRocksDBMetrics, + int parallelTableIteratorMaxPoolSize) throws IOException { Preconditions.checkNotNull(dbFile, "DB file location cannot be null"); Preconditions.checkNotNull(families); @@ -94,7 +95,7 @@ public RDBStore(File dbFile, ManagedDBOptions dbOptions, ManagedStatistics stati dbLocation = dbFile; this.dbOptions = dbOptions; this.statistics = statistics; - + this.parallelTableIteratorPool = new ThrottledThreadpoolExecutor(parallelTableIteratorMaxPoolSize); try { if (enableCompactionDag) { rocksDBCheckpointDiffer = RocksDBCheckpointDifferHolder.getInstance( @@ -226,7 +227,6 @@ public void close() throws IOException { metrics.unregister(); metrics = null; } - RDBMetrics.unRegister(); IOUtils.close(LOG, checkPointManager); if (rocksDBCheckpointDiffer != null) { @@ -237,6 +237,10 @@ public void close() throws IOException { IOUtils.close(LOG, statistics); } IOUtils.close(LOG, db); + + if (parallelTableIteratorPool != null) { + IOUtils.close(LOG, parallelTableIteratorPool); + } } @Override @@ -290,34 +294,34 @@ public RDBTable getTable(String name) throws IOException { if (handle == null) { throw new IOException("No such table in this DB. TableName : " + name); } - return new RDBTable(this.db, handle, rdbMetrics); + return new RDBTable(db, handle, rdbMetrics, parallelTableIteratorPool); } @Override public TypedTable getTable(String name, Class keyType, Class valueType) throws IOException { return new TypedTable<>(getTable(name), codecRegistry, keyType, - valueType); + valueType, parallelTableIteratorPool); } @Override public TypedTable getTable( String name, Codec keyCodec, Codec valueCodec, TableCache.CacheType cacheType) throws IOException { - return new TypedTable<>(getTable(name), keyCodec, valueCodec, cacheType); + return new TypedTable<>(getTable(name), keyCodec, valueCodec, cacheType, parallelTableIteratorPool); } @Override public Table getTable(String name, Class keyType, Class valueType, TableCache.CacheType cacheType) throws IOException { - return new TypedTable<>(getTable(name), codecRegistry, keyType, valueType, cacheType); + return new TypedTable<>(getTable(name), codecRegistry, keyType, valueType, cacheType, parallelTableIteratorPool); } @Override public ArrayList
listTables() { ArrayList
returnList = new ArrayList<>(); for (ColumnFamily family : getColumnFamilies()) { - returnList.add(new RDBTable(db, family, rdbMetrics)); + returnList.add(new RDBTable(db, family, rdbMetrics, parallelTableIteratorPool)); } return returnList; } @@ -465,6 +469,11 @@ public DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount) return dbUpdatesWrapper; } + @Override + public ThrottledThreadpoolExecutor getParallelTableIteratorPool() { + return this.parallelTableIteratorPool; + } + @Override public boolean isClosed() { return db.isClosed(); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java index 5c5247e011a5..ac696f0794e6 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java @@ -17,16 +17,22 @@ package org.apache.hadoop.hdds.utils.db; +import static org.apache.hadoop.hdds.utils.db.RocksDatabase.bytes2String; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; +import org.apache.ratis.util.function.CheckedFunction; +import org.rocksdb.LiveFileMetaData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +42,7 @@ * metadata store content. All other user's using Table should use TypedTable. */ @InterfaceAudience.Private -class RDBTable implements Table { +class RDBTable implements BaseRDBTable { private static final Logger LOG = @@ -45,6 +51,7 @@ class RDBTable implements Table { private final RocksDatabase db; private final ColumnFamily family; private final RDBMetrics rdbMetrics; + private final RDBParallelTableOperator parallelTableOperator; /** * Constructs a TableStore. @@ -52,11 +59,11 @@ class RDBTable implements Table { * @param db - DBstore that we are using. * @param family - ColumnFamily Handle. */ - RDBTable(RocksDatabase db, ColumnFamily family, - RDBMetrics rdbMetrics) { + RDBTable(RocksDatabase db, ColumnFamily family, RDBMetrics rdbMetrics, ThrottledThreadpoolExecutor executor) { this.db = db; this.family = family; this.rdbMetrics = rdbMetrics; + this.parallelTableOperator = new RDBParallelTableOperator<>(executor, this, ByteArrayCodec.get()); } public ColumnFamily getColumnFamily() { @@ -224,6 +231,14 @@ public TableIterator> iterator(byte[] prefix) prefix); } + @Override + public void parallelTableOperation(byte[] startKey, byte[] endKey, + CheckedFunction, Void, IOException> operation, + Logger logger, int logPercentageThreshold) + throws IOException, ExecutionException, InterruptedException { + this.parallelTableOperator.performTaskOnTableVals(startKey, endKey, operation, logger, logPercentageThreshold); + } + TableIterator> iterator( CodecBuffer prefix) throws IOException { return new RDBStoreCodecBufferIterator(db.newIterator(family, false), @@ -363,4 +378,11 @@ && get(startKey) == null) { } return result; } + + @Override + public List getTableSstFiles() throws IOException { + return this.db.getSstFileList().stream() + .filter(liveFileMetaData -> getName().equals(bytes2String(liveFileMetaData.columnFamilyName()))) + .collect(Collectors.toList()); + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java index 74c8d95e670e..a1e3117f888b 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; import static org.apache.hadoop.hdds.conf.ConfigTag.OM; +import static org.apache.hadoop.hdds.conf.ConfigTag.RECON; import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; import org.apache.hadoop.hdds.conf.Config; @@ -84,6 +85,13 @@ public class RocksDBConfiguration { + "Default 0 means no limit.") private long walSizeLimit = 0; + @Config(key = "rocksdb.parallel.iterator.max.pool.size", + type = ConfigType.INT, + defaultValue = "10", + tags = {OM, SCM, DATANODE, RECON}, + description = "Max pool size for parallely iterating a rocksdb table in ozone") + private int parallelIteratorMaxPoolSize = 10; + public void setRocksdbLoggingEnabled(boolean enabled) { this.rocksdbLogEnabled = enabled; } @@ -139,4 +147,8 @@ public void setKeepLogFileNum(int fileNum) { public int getKeepLogFileNum() { return rocksdbKeepLogFileNum; } + + public int getParallelIteratorMaxPoolSize() { + return parallelIteratorMaxPoolSize; + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java index 5a07558cd546..9b91c528d60e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/Table.java @@ -23,12 +23,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.hdds.annotation.InterfaceStability; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.TableCacheMetrics; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.ratis.util.function.CheckedFunction; +import org.slf4j.Logger; /** * Interface for key-value store that stores ozone metadata. Ozone metadata is @@ -173,6 +176,22 @@ default VALUE getReadCopy(KEY key) throws IOException { TableIterator> iterator(KEY prefix) throws IOException; + /** + * Performs an operation by parallely iterating through the table by splitting table into multiple threads. + * To be used where order of keys don't matter while operating on the table key value. + * @param startKey startKey to begin iteration from. When null iteration should happen from the smallest key in the + * table. + * @param endKey endKey to end(exclusive) iteration. When null iteration will happen till the largest key in the + * table. + * @param operation operation to be performed on the key value. + * @param logger Logger to log iterator status. + * @param logPercentageThreshold Percentage b/w [1-100] which would be used to log table iteration. + */ + void parallelTableOperation( + KEY startKey, KEY endKey, CheckedFunction, Void, IOException> operation, + Logger logger, int logPercentageThreshold) + throws IOException, ExecutionException, InterruptedException; + /** * Returns the Name of this Table. * @return - Table Name. diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutor.java new file mode 100644 index 000000000000..35d3ec379179 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ratis.util.function.CheckedRunnable; + +/** + * ThreadPoolExecutor which throttles on the request before submitting a runnable to a ThreadPoolExecutor. + */ +public class ThrottledThreadpoolExecutor implements Closeable { + private final ThreadPoolExecutor pool; + private final AtomicInteger availableTaskSlots; + private final Object lock; + + public ThrottledThreadpoolExecutor(int maxNumberOfThreads) { + pool = new ThreadPoolExecutor(maxNumberOfThreads, maxNumberOfThreads, 5, TimeUnit.MINUTES, + new LinkedBlockingQueue<>()); + pool.allowCoreThreadTimeOut(true); + int maxNumberOfTasks = 2 * maxNumberOfThreads; + availableTaskSlots = new AtomicInteger(maxNumberOfTasks); + lock = new Object(); + } + + public CompletableFuture submit(CheckedRunnable task) throws InterruptedException { + waitForQueue(); + CompletableFuture future = new CompletableFuture<>(); + pool.submit(() -> { + try { + task.run(); + future.complete(null); + } catch (Throwable e) { + future.completeExceptionally(e); + } finally { + availableTaskSlots.incrementAndGet(); + synchronized (lock) { + lock.notify(); + } + } + }); + return future; + } + + public void waitForQueue() throws InterruptedException { + synchronized (lock) { + while (availableTaskSlots.get() <= 0) { + lock.wait(10000); + } + availableTaskSlots.decrementAndGet(); + } + } + + @Override + public void close() throws IOException { + this.pool.shutdown(); + } + + public long getTaskCount() { + return pool.getTaskCount(); + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java index f39d55327aab..a612840d9994 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; import org.apache.hadoop.hdds.utils.TableCacheMetrics; @@ -43,6 +44,9 @@ import org.apache.hadoop.hdds.utils.db.cache.TableNoCache; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.function.CheckedBiFunction; +import org.apache.ratis.util.function.CheckedFunction; +import org.rocksdb.LiveFileMetaData; +import org.slf4j.Logger; /** * Strongly typed table implementation. @@ -53,7 +57,7 @@ * @param type of the keys in the store. * @param type of the values in the store. */ -public class TypedTable implements Table { +public class TypedTable implements BaseRDBTable { private static final long EPOCH_DEFAULT = -1L; static final int BUFFER_SIZE_DEFAULT = 4 << 10; // 4 KB @@ -67,14 +71,16 @@ public class TypedTable implements Table { private final CodecBuffer.Capacity bufferCapacity = new CodecBuffer.Capacity(this, BUFFER_SIZE_DEFAULT); private final TableCache cache; + private final RDBParallelTableOperator parallelTableOperator; /** * The same as this(rawTable, codecRegistry, keyType, valueType, * CacheType.PARTIAL_CACHE). */ - TypedTable(RDBTable rawTable, CodecRegistry codecRegistry, Class keyType, Class valueType) + TypedTable(RDBTable rawTable, CodecRegistry codecRegistry, Class keyType, Class valueType, + ThrottledThreadpoolExecutor throttledThreadpoolExecutor) throws IOException { - this(rawTable, codecRegistry, keyType, valueType, CacheType.PARTIAL_CACHE); + this(rawTable, codecRegistry, keyType, valueType, CacheType.PARTIAL_CACHE, throttledThreadpoolExecutor); } /** @@ -88,9 +94,9 @@ public class TypedTable implements Table { * @throws IOException if failed to iterate the raw table. */ TypedTable(RDBTable rawTable, CodecRegistry codecRegistry, Class keyType, Class valueType, - CacheType cacheType) throws IOException { + CacheType cacheType, ThrottledThreadpoolExecutor throttledThreadpoolExecutor) throws IOException { this(rawTable, codecRegistry.getCodecFromClass(keyType), codecRegistry.getCodecFromClass(valueType), - cacheType); + cacheType, throttledThreadpoolExecutor); } /** @@ -102,8 +108,9 @@ public class TypedTable implements Table { * @param cacheType How to cache the entries? * @throws IOException */ - public TypedTable( - RDBTable rawTable, Codec keyCodec, Codec valueCodec, CacheType cacheType) throws IOException { + public TypedTable(RDBTable rawTable, Codec keyCodec, Codec valueCodec, + CacheType cacheType, + ThrottledThreadpoolExecutor throttledThreadpoolExecutor) throws IOException { this.rawTable = Objects.requireNonNull(rawTable, "rawTable==null"); this.keyCodec = Objects.requireNonNull(keyCodec, "keyCodec == null"); this.valueCodec = Objects.requireNonNull(valueCodec, "valueCodec == null"); @@ -135,6 +142,7 @@ public TypedTable( } else { cache = TableNoCache.instance(); } + this.parallelTableOperator = new RDBParallelTableOperator<>(throttledThreadpoolExecutor, this, keyCodec); } private CodecBuffer encodeKeyCodecBuffer(KEY key) throws IOException { @@ -444,6 +452,13 @@ public Table.KeyValueIterator iterator(KEY prefix) } } + @Override + public void parallelTableOperation( + KEY startKey, KEY endKey, CheckedFunction, Void, IOException> operation, + Logger logger, int logPercentageThreshold) throws IOException, ExecutionException, InterruptedException { + this.parallelTableOperator.performTaskOnTableVals(startKey, endKey, operation, logger, logPercentageThreshold); + } + @Override public String getName() { return rawTable.getName(); @@ -558,6 +573,11 @@ TableCache getCache() { return cache; } + @Override + public List getTableSstFiles() throws IOException { + return rawTable.getTableSstFiles(); + } + /** * Key value implementation for strongly typed tables. */ diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java index 0432c7290635..42a28ea29cdf 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTable.java @@ -23,12 +23,20 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.hdds.utils.MetadataKeyFilters; +import org.apache.ratis.util.function.CheckedFunction; +import org.slf4j.Logger; /** * InMemory Table implementation for tests. */ public final class InMemoryTestTable implements Table { - private final Map map = new ConcurrentHashMap<>(); + private final Map map; + private final Codec keyCodec; + + public InMemoryTestTable(Codec keyCodec) { + this.keyCodec = keyCodec; + this.map = new ConcurrentHashMap<>(); + } @Override public void close() { @@ -80,12 +88,19 @@ public void deleteRange(KEY beginKey, KEY endKey) { } @Override - public TableIterator> iterator() { - throw new UnsupportedOperationException(); + public TableIterator> iterator() throws IOException { + return new InMemoryTestTableIterator<>(this.map, null, this.keyCodec); + } + + @Override + public TableIterator> iterator(KEY prefix) throws IOException { + return new InMemoryTestTableIterator<>(this.map, prefix, this.keyCodec); } @Override - public TableIterator> iterator(KEY prefix) { + public void parallelTableOperation( + KEY startKey, KEY endKey, CheckedFunction, Void, IOException> operation, + Logger logger, int logPercentageThreshold) { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTableIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTableIterator.java new file mode 100644 index 000000000000..a9b582aaaa18 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/InMemoryTestTableIterator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Generic Table Iterator implementation that can be used for unit tests to reduce redundant mocking in tests. + */ +public class InMemoryTestTableIterator implements Table.KeyValueIterator { + + private Iterator> itr; + private final Comparator comparator; + private final Comparator keyComparator; + private final Codec keyCodec; + private final NavigableMap values; + + public InMemoryTestTableIterator(Map values, K prefix, Codec keyCodec) throws IOException { + byte[] startKey = prefix == null ? null : keyCodec.toPersistedFormat(prefix); + byte[] endKey = prefix == null ? null : keyCodec.toPersistedFormat(prefix); + if (endKey != null) { + endKey[endKey.length - 1] += 1; + } + comparator = new Codec.ByteArrayComparator(); + this.keyComparator = keyCodec.comparator(); + this.keyCodec = keyCodec; + this.values = values.entrySet().stream() + .filter(e -> startKey == null || compare(startKey, e.getKey()) >= 0) + .filter(e -> endKey == null || compare(endKey, e.getKey()) < 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (v1, v2) -> v2, TreeMap::new)); + this.seekToFirst(); + } + + private int compare(byte[] b1, K key) { + try { + return this.comparator.compare(b1, keyCodec.toPersistedFormat(key)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void seekToFirst() { + this.itr = this.values.entrySet().stream() + .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + } + + @Override + public void seekToLast() { + try { + this.seek(this.values.lastKey()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Table.KeyValue seek(K s) throws IOException { + this.itr = this.values.entrySet().stream() + .filter(e -> keyComparator.compare(e.getKey(), s) >= 0) + .map(e -> Table.newKeyValue(e.getKey(), e.getValue())).iterator(); + Map.Entry firstEntry = values.ceilingEntry(s); + return firstEntry == null ? null : Table.newKeyValue(firstEntry.getKey(), firstEntry.getValue()); + } + + @Override + public void removeFromDB() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + } + + @Override + public boolean hasNext() { + return this.itr.hasNext(); + } + + @Override + public Table.KeyValue next() { + return itr.next(); + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestParallelTableOperator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestParallelTableOperator.java new file mode 100644 index 000000000000..77e2ddc122a5 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestParallelTableOperator.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Stream; +import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.ratis.util.function.CheckedFunction; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test class for ParallelTableOperator. + */ +public class TestParallelTableOperator { + + private static ThrottledThreadpoolExecutor executor; + private static final Logger LOG = LoggerFactory.getLogger(TestParallelTableOperator.class); + private Map visited; + + @BeforeAll + public static void init() { + executor = new ThrottledThreadpoolExecutor(10); + } + + @AfterAll + public static void shutdown() { + IOUtils.closeQuietly(executor); + } + + @BeforeEach + public void setup() { + visited = new ConcurrentHashMap<>(); + } + + private static Stream getParallelTableOperatorArguments() { + return Stream.of( + Arguments.of(1, null, null, 1), + Arguments.of(1, 100, 200, 1), + Arguments.of(5, null, null, 8), + Arguments.of(5, 100, 200, 8), + Arguments.of(8, null, null, 5), + Arguments.of(8, 100, 200, 5) + ); + } + + private static Stream getParallelTableOperatorArgumentsBoundsException() { + return Stream.of( + Arguments.of(1, null, null), + Arguments.of(1, 100, 200), + Arguments.of(5, null, null), + Arguments.of(5, 100, 200), + Arguments.of(8, null, null), + Arguments.of(8, 100, 200) + ); + } + + @SuppressWarnings("checkstyle:ParameterNumber") + private void testParallelTableOperator(Table intTable, + ParallelTableOperator, Integer, Integer> parallelTableOperator, + int maxKeyIdx, int keyJumps, Integer start, Integer end, List bounds, + CheckedFunction, Void, IOException> opr, Supplier expectedException) + throws IOException, ExecutionException, InterruptedException { + List seekedKeys = Collections.synchronizedList(new ArrayList<>()); + when(intTable.iterator()).thenAnswer(i -> { + Table.KeyValueIterator itr = (Table.KeyValueIterator) spy(i.callRealMethod()); + doAnswer(j -> { + seekedKeys.add(null); + return j.callRealMethod(); + }).when(itr).seekToFirst(); + doAnswer(j -> { + seekedKeys.add(j.getArgument(0)); + return j.callRealMethod(); + }).when(itr).seek(any(Integer.class)); + return itr; + }); + Map expectedKeys = new HashMap<>(); + for (int i = 0; i < maxKeyIdx; i += keyJumps) { + intTable.put(i, i); + if ((start == null || start <= i) && (end == null || end > i)) { + expectedKeys.put(i, 1); + } + } + + List expectedSeekedKeys = bounds.subList(0, bounds.size() - 1); + if (expectedException != null) { + Exception e = assertThrows(ExecutionException.class, + () -> parallelTableOperator.performTaskOnTableVals(start, end, opr, LOG, 1)); + Exception expectedEx = expectedException.get(); + assertEquals(expectedEx.getClass(), e.getCause().getClass()); + assertEquals(expectedEx.getMessage(), e.getCause().getMessage()); + } else { + parallelTableOperator.performTaskOnTableVals(start, end, opr, LOG, 1); + assertEquals(expectedKeys, visited); + seekedKeys.sort((a, b) -> { + if (Objects.equals(a, b)) { + return 0; + } + if (a == null) { + return -1; + } + if (b == null) { + return 1; + } + return Integer.compare(a, b); + }); + assertEquals(expectedSeekedKeys, seekedKeys); + } + } + + @ParameterizedTest + @MethodSource("getParallelTableOperatorArguments") + public void testParallelTableOperator(int keyJumps, Integer start, Integer end, int boundJumps) + throws IOException, ExecutionException, InterruptedException { + int maxKeyIdx = 1001; + List bounds = new ArrayList<>(); + bounds.add(start); + for (int i = (start == null ? 0 : start) + boundJumps; + i < (end == null ? (maxKeyIdx - 1) : end); i += boundJumps) { + bounds.add(i); + } + bounds.add(end); + Table intTable = spy(new InMemoryTestTable<>(IntegerCodec.get())); + CheckedFunction, Void, IOException> opr = kv -> { + visited.compute(kv.getKey(), (k, v) -> v == null ? 1 : v + 1); + return null; + }; + + ParallelTableOperator, Integer, Integer> parallelTableOperator = + new ParallelTableOperator, Integer, Integer>(executor, intTable, IntegerCodec.get()) { + @Override + protected List getBounds(Integer startKey, Integer endKey) { + return bounds; + } + }; + testParallelTableOperator(intTable, parallelTableOperator, maxKeyIdx, keyJumps, start, end, bounds, opr, null); + } + + + @ParameterizedTest + @MethodSource("getParallelTableOperatorArgumentsBoundsException") + public void testParallelTableOperatorWithBoundsException(int keyJumps, Integer start, Integer end) + throws IOException, ExecutionException, InterruptedException { + int maxKeyIdx = 1001; + List bounds = new ArrayList<>(); + bounds.add(start); + bounds.add(end); + Table intTable = spy(new InMemoryTestTable<>(IntegerCodec.get())); + + CheckedFunction, Void, IOException> opr = kv -> { + visited.compute(kv.getKey(), (k, v) -> v == null ? 1 : v + 1); + return null; + }; + + ParallelTableOperator, Integer, Integer> parallelTableOperator = + new ParallelTableOperator, Integer, Integer>(executor, intTable, IntegerCodec.get()) { + @Override + protected List getBounds(Integer startKey, Integer endKey) throws IOException { + throw new IOException("Exception while getting bounds"); + } + }; + testParallelTableOperator(intTable, parallelTableOperator, maxKeyIdx, keyJumps, start, end, bounds, opr, null); + } + + @ParameterizedTest + @MethodSource("getParallelTableOperatorArguments") + public void testParallelTableOperatorWithException(int keyJumps, Integer start, Integer end, int boundJumps) + throws IOException, ExecutionException, InterruptedException { + int maxKeyIdx = 1001; + List bounds = new ArrayList<>(); + bounds.add(start); + for (int i = (start == null ? 0 : start) + boundJumps; + i < (end == null ? (maxKeyIdx - 1) : end); i += boundJumps) { + bounds.add(i); + } + bounds.add(end); + AtomicInteger minException = new AtomicInteger(Integer.MAX_VALUE); + Table intTable = spy(new InMemoryTestTable<>(IntegerCodec.get())); + CheckedFunction, Void, IOException> opr = kv -> { + Integer key = kv.getKey(); + visited.compute(kv.getKey(), (k, v) -> v == null ? 1 : v + 1); + if (key != 0 && key % (3 * keyJumps) == 0) { + minException.updateAndGet((prev) -> Math.min(prev, key)); + throw new IOException("Exception while getting bounds for key " + key); + } + return null; + }; + + ParallelTableOperator, Integer, Integer> parallelTableOperator = + new ParallelTableOperator, Integer, Integer>(executor, intTable, IntegerCodec.get()) { + @Override + protected List getBounds(Integer startKey, Integer endKey) { + return bounds; + } + }; + testParallelTableOperator(intTable, parallelTableOperator, maxKeyIdx, keyJumps, start, end, bounds, opr, + () -> new IOException("Exception while getting bounds for key " + minException.get())); + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java index a2f91d5b2e1e..b41e73bee9d4 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStore.java @@ -62,7 +62,7 @@ public static RDBStore newRDBStore(File dbFile, ManagedDBOptions options, throws IOException { return new RDBStore(dbFile, options, null, new ManagedWriteOptions(), families, CodecRegistry.newBuilder().build(), false, null, false, - maxDbUpdatesSizeThreshold, true, null, true); + maxDbUpdatesSizeThreshold, true, null, true, 5); } public static final int MAX_DB_UPDATES_SIZE_THRESHOLD = 80; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 37f81369f91f..13f12caf7747 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -406,9 +406,8 @@ public void testGetByteBuffer() throws Exception { final StringCodec codec = StringCodec.get(); final String tableName = families.get(0); try (RDBTable testTable = rdbStore.getTable(tableName)) { - final TypedTable typedTable = new TypedTable<>( - testTable, CodecRegistry.newBuilder().build(), - String.class, String.class); + final TypedTable typedTable = new TypedTable<>(testTable, CodecRegistry.newBuilder().build(), + String.class, String.class, null); for (int i = 0; i < 20; i++) { final int valueSize = TypedTable.BUFFER_SIZE_DEFAULT * i / 4; diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java index 1e88ee0176c7..b377741465a4 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestTypedRDBTableStore.java @@ -123,10 +123,8 @@ public void putGetAndEmpty() throws Exception { private Table createTypedTable(String name) throws IOException { - return new TypedTable( - rdbStore.getTable(name), - codecRegistry, - String.class, String.class); + return new TypedTable(rdbStore.getTable(name), + codecRegistry, String.class, String.class, rdbStore.getParallelTableIteratorPool()); } @Test @@ -253,7 +251,7 @@ public void testIteratorOnException() throws Exception { when(rdbTable.iterator((CodecBuffer) null)) .thenThrow(new IOException()); try (Table testTable = new TypedTable<>(rdbTable, - codecRegistry, String.class, String.class)) { + codecRegistry, String.class, String.class, rdbStore.getParallelTableIteratorPool())) { assertThrows(IOException.class, testTable::iterator); } } @@ -412,7 +410,7 @@ public void testByteArrayTypedTable() throws Exception { try (Table testTable = new TypedTable<>( rdbStore.getTable("Ten"), codecRegistry, - byte[].class, byte[].class)) { + byte[].class, byte[].class, null)) { byte[] key = new byte[] {1, 2, 3}; byte[] value = new byte[] {4, 5, 6}; testTable.put(key, value); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutorTest.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutorTest.java new file mode 100644 index 000000000000..157354e66277 --- /dev/null +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/ThrottledThreadpoolExecutorTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils.db; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Test class for ThrottledThreadpoolExecutor. + */ +public class ThrottledThreadpoolExecutorTest { + + private ThrottledThreadpoolExecutor executor; + + @BeforeEach + public void setup() { + // coreThreads = 2, maxThreads = 4 => maxTasks = 8 + executor = new ThrottledThreadpoolExecutor(4); + } + + @AfterEach + public void tearDown() throws Exception { + executor.close(); + } + + @Test + public void testThrottlingDoesNotExceedMaxTasks() throws Exception { + int maxThreads = 4; + int maxTasks = 2 * maxThreads; // 8 + AtomicInteger concurrentTasks = new AtomicInteger(0); + AtomicInteger maxObservedConcurrentTasks = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(maxThreads); + CountDownLatch releaseLatch = new CountDownLatch(1); // To block all tasks + + for (int i = 0; i < maxTasks; i++) { + executor.submit(() -> { + int current = concurrentTasks.incrementAndGet(); + System.out.println(current); + maxObservedConcurrentTasks.updateAndGet(prev -> Math.max(prev, current)); + latch.countDown(); + releaseLatch.await(); // block the task + concurrentTasks.decrementAndGet(); + }); + } + + // Wait for all tasks to start (meaning all 8 task slots used) + assertTrue(latch.await(10, TimeUnit.SECONDS), "Tasks did not start in time"); + assertEquals(maxTasks, executor.getTaskCount()); + + // Now try to submit one more task — it should block until a slot frees up + CompletableFuture extraTask = CompletableFuture.runAsync(() -> { + try { + executor.submit(() -> { + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + // Wait for 1 second and ensure extraTask hasn't completed yet + Thread.sleep(1000); + assertFalse(extraTask.isDone(), "Extra task should be throttled and blocked"); + assertEquals(maxTasks, executor.getTaskCount()); + + // Finish all initially submitted threads. + releaseLatch.countDown(); + + // Now the extra task should complete within a few seconds + assertDoesNotThrow(() -> extraTask.get(5, TimeUnit.SECONDS)); + + // Make sure we never had more than maxTasks in flight + assertEquals(4, maxObservedConcurrentTasks.get(), "Throttling failed"); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java index 7b2be1cf9e87..5f1b52db55e3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/BigIntegerCodec.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.math.BigInteger; +import java.util.Comparator; +import java.util.Objects; import org.apache.hadoop.hdds.utils.db.Codec; /** @@ -27,6 +29,7 @@ public final class BigIntegerCodec implements Codec { private static final Codec INSTANCE = new BigIntegerCodec(); + private static final Comparator COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, BigInteger::compareTo); public static Codec get() { return INSTANCE; @@ -55,4 +58,9 @@ public BigInteger fromPersistedFormat(byte[] rawData) throws IOException { public BigInteger copyObject(BigInteger object) { return object; } + + @Override + public Comparator comparator() { + return COMPARATOR; + } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 2138db275a04..ca2ac8ad8c41 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -171,6 +171,9 @@ private OMConfigKeys() { public static final String OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT = "5m"; public static final String OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED = "ozone.om.snapshot.rocksdb.metrics.enabled"; + public static final String OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE = + "ozone.om.snapshot.db.parallel.iterator.pool.size"; + public static final int OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE_DEFAULT = 1; public static final boolean OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT = false; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index d5fe33958aab..d868d0c84dfd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -25,6 +25,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_MAX_OPEN_FILES_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_CHECKPOINT_DIR_CREATION_POLL_TIMEOUT; @@ -393,7 +395,7 @@ private OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) int maxOpenFiles = conf.getInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES, OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT); setStore(loadDB(conf, dir, name, true, Optional.of(Boolean.TRUE), - maxOpenFiles, false, false, true)); + maxOpenFiles, false, false, true, Optional.empty())); initializeOmTables(CacheType.PARTIAL_CACHE, false); perfMetrics = null; } @@ -425,10 +427,12 @@ private OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String name) // Check if the snapshot directory exists. checkSnapshotDirExist(checkpoint); } + int parallelTableIteratorMaxPoolSize = conf.getInt(OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE, + OZONE_OM_SNAPSHOT_DB_PARALLEL_ITERATOR_MAX_POOL_SIZE_DEFAULT); setStore(loadDB(conf, metaDir, dbName, false, java.util.Optional.of(Boolean.TRUE), maxOpenFiles, false, false, conf.getBoolean(OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED, - OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT))); + OZONE_OM_SNAPSHOT_ROCKSDB_METRICS_ENABLED_DEFAULT), Optional.of(parallelTableIteratorMaxPoolSize))); initializeOmTables(CacheType.PARTIAL_CACHE, false); } catch (IOException e) { stop(); @@ -572,7 +576,7 @@ public void start(OzoneConfiguration configuration) throws IOException { public static DBStore loadDB(OzoneConfiguration configuration, File metaDir, int maxOpenFiles) throws IOException { return loadDB(configuration, metaDir, OM_DB_NAME, false, - java.util.Optional.empty(), maxOpenFiles, true, true, true); + java.util.Optional.empty(), maxOpenFiles, true, true, true, Optional.empty()); } @SuppressWarnings("checkstyle:parameternumber") @@ -582,7 +586,8 @@ public static DBStore loadDB(OzoneConfiguration configuration, File metaDir, int maxOpenFiles, boolean enableCompactionDag, boolean createCheckpointDirs, - boolean enableRocksDBMetrics) + boolean enableRocksDBMetrics, + Optional parallelTableIteratorMaxPoolSize) throws IOException { RocksDBConfiguration rocksDBConfiguration = configuration.getObject(RocksDBConfiguration.class); @@ -596,6 +601,7 @@ public static DBStore loadDB(OzoneConfiguration configuration, File metaDir, .setEnableRocksDbMetrics(enableRocksDBMetrics); disableAutoCompaction.ifPresent( dbStoreBuilder::disableDefaultCFAutoCompaction); + parallelTableIteratorMaxPoolSize.ifPresent(dbStoreBuilder::setParallelTableIteratorMaxPoolSize); return addOMTablesAndCodecs(dbStoreBuilder).build(); }