Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hdds.utils.db;

import jakarta.annotation.Nonnull;
import java.util.Comparator;
import java.util.Objects;

/**
* Codec to serialize/deserialize {@link Boolean}.
Expand All @@ -27,6 +29,7 @@ public final class BooleanCodec implements Codec<Boolean> {
private static final byte TRUE = 1;
private static final byte FALSE = 0;
private static final BooleanCodec INSTANCE = new BooleanCodec();
private static final Comparator<Boolean> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Boolean::compare);

public static BooleanCodec get() {
return INSTANCE;
Expand Down Expand Up @@ -75,4 +78,9 @@ public Boolean fromPersistedFormat(byte[] rawData) {
public Boolean copyObject(Boolean object) {
return object;
}

@Override
public Comparator<Boolean> comparator() {
return COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.util.Comparator;
import java.util.Objects;

/**
* Codec interface to serialize/deserialize objects to/from bytes.
Expand All @@ -29,6 +32,7 @@
*/
public interface Codec<T> {
byte[] EMPTY_BYTE_ARRAY = {};
ByteArrayComparator COMPARATOR = new ByteArrayComparator();

/** @return the class of the {@link T}. */
Class<T> getTypeClass();
Expand Down Expand Up @@ -112,4 +116,36 @@ default T fromCodecBuffer(@Nonnull CodecBuffer buffer) throws IOException {
* the returned object can possibly be the same as the given object.
*/
T copyObject(T object);

/**
* @return Comparator for the given type of object.
*/
default Comparator<T> comparator() {
return (o1, o2) -> {
try {
byte[] a1 = o1 == null ? null : toPersistedFormat(o1);
byte[] a2 = o2 == null ? null : toPersistedFormat(o2);
return Objects.compare(a1, a2, COMPARATOR);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}

/**
* Comparator to compare 2 byte arrays based on their relative lexicographical ordering.
*/
class ByteArrayComparator implements Comparator<byte[]>, Serializable {
@Override
public int compare(byte[] o1, byte[] o2) {
int len = Math.min(o1.length, o2.length);
for (int i = 0; i < len; i++) {
int cmp = Byte.toUnsignedInt(o1[i]) - Byte.toUnsignedInt(o2[i]);
if (cmp != 0) {
return cmp;
}
}
return Integer.compare(o1.length, o2.length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.Objects;
import org.apache.ratis.util.function.CheckedFunction;

/**
Expand Down Expand Up @@ -128,4 +131,18 @@ public T copyObject(T message) {
throw new IllegalStateException("Failed to copyObject", e);
}
}

@Override
public Comparator<T> comparator() {
return (o1, o2) -> {
try {
DELEGATE d1 = o1 == null ? null : backward.apply(o1);
DELEGATE d2 = o2 == null ? null : backward.apply(o2);
return Objects.compare(d1, d2, delegate.comparator());
} catch (IOException e) {
throw new UncheckedIOException(e);
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import jakarta.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Objects;

/**
* Codec to serialize/deserialize {@link Integer}.
*/
public final class IntegerCodec implements Codec<Integer> {

private static final IntegerCodec INSTANCE = new IntegerCodec();
private static final Comparator<Integer> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Integer::compare);

public static IntegerCodec get() {
return INSTANCE;
Expand Down Expand Up @@ -70,4 +73,9 @@ public Integer fromPersistedFormat(byte[] rawData) {
public Integer copyObject(Integer object) {
return object;
}

@Override
public Comparator<Integer> comparator() {
return COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import jakarta.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Objects;

/**
* Codec to serialize/deserialize {@link Long}.
*/
public final class LongCodec implements Codec<Long> {
private static final LongCodec CODEC = new LongCodec();
private static final Comparator<Long> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Long::compare);

public static LongCodec get() {
return CODEC;
Expand Down Expand Up @@ -74,4 +77,9 @@ public Long fromPersistedFormat(byte[] rawData) {
public Long copyObject(Long object) {
return object;
}

@Override
public Comparator<Long> comparator() {
return COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import jakarta.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Objects;

/**
* Codec to serialize/deserialize {@link Short}.
*/
public final class ShortCodec implements Codec<Short> {

private static final ShortCodec INSTANCE = new ShortCodec();
private static final Comparator<Short> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, Short::compare);

public static ShortCodec get() {
return INSTANCE;
Expand Down Expand Up @@ -71,4 +74,9 @@ public Short fromPersistedFormat(byte[] rawData) {
public Short copyObject(Short object) {
return object;
}

@Override
public Comparator<Short> comparator() {
return COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Comparator;
import java.util.Objects;
import java.util.function.Function;
import org.apache.hadoop.hdds.StringUtils;
Expand All @@ -39,6 +40,7 @@
*/
abstract class StringCodecBase implements Codec<String> {
static final Logger LOG = LoggerFactory.getLogger(StringCodecBase.class);
private static final Comparator<String> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, String::compareTo);

private final Charset charset;
private final boolean fixedLength;
Expand Down Expand Up @@ -197,4 +199,9 @@ public String fromPersistedFormat(byte[] bytes) {
public String copyObject(String object) {
return object;
}

@Override
public Comparator<String> comparator() {
return COMPARATOR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ public final class OzoneConfigKeys {
public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT =
"ozone.client.ec.grpc.write.timeout";
public static final String OZONE_CLIENT_EC_GRPC_WRITE_TIMEOUT_DEFAULT = "30s";
public static final String OZONE_PARALLEL_DB_ITERATORS_CORE_POOL_SIZE = "ozone.db.parallel.iterators.core.pool.size";
public static final int OZONE_PARALLEL_DB_ITERATORS_CORE_POOL_SIZE_DEFAULT = 0;
public static final String OZONE_PARALLEL_DB_ITERATORS_MAX_POOL_SIZE = "ozone.db.parallel.iterators.max.pool.size";
public static final int OZONE_PARALLEL_DB_ITERATORS_MAX_POOL_SIZE_DEFAULT = 10;


/**
Expand Down
8 changes: 8 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3963,6 +3963,14 @@
OM grpc server netty boss event group size.
</description>
</property>
<property>
<name>ozone.om.snapshot.db.parallel.iterator.pool.size</name>
<value>1</value>
<tag>OZONE, OM</tag>
<description>
Max iterator pool size for parallely iterating through DB.
</description>
</property>

<property>
<name>ozone.om.grpc.workergroup.size</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;

/**
* Wrapper class to represent a table in a datanode RocksDB instance.
Expand Down Expand Up @@ -89,6 +92,14 @@ public void deleteWithBatch(BatchOperation batch, KEY key)
"version.");
}

@Override
public void parallelTableOperation(
KEY startKey, KEY endKey, CheckedFunction<KeyValue<KEY, VALUE>, Void, IOException> operation,
Logger logger, int logPercentageThreshold)
throws IOException, ExecutionException, InterruptedException {
table.parallelTableOperation(startKey, endKey, operation, logger, logPercentageThreshold);
}

@Override
public String getName() throws IOException {
return table.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.time.Clock;
import org.apache.hadoop.hdds.utils.db.InMemoryTestTable;
import org.apache.hadoop.hdds.utils.db.LongCodec;

/**
* Helper utility to test container impl.
Expand All @@ -33,10 +34,10 @@ public static ContainerSet newContainerSet() {
}

public static ContainerSet newContainerSet(long recoveringTimeout) {
return ContainerSet.newRwContainerSet(new InMemoryTestTable<>(), recoveringTimeout);
return ContainerSet.newRwContainerSet(new InMemoryTestTable<>(LongCodec.get()), recoveringTimeout);
}

public static ContainerSet newContainerSet(long recoveringTimeout, Clock clock) {
return new ContainerSet(new InMemoryTestTable<>(), recoveringTimeout, clock);
return new ContainerSet(new InMemoryTestTable<>(LongCodec.get()), recoveringTimeout, clock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import java.io.IOException;
import java.util.List;
import org.rocksdb.LiveFileMetaData;

/**
* Base table interface for Rocksdb.
*/
public interface BaseRDBTable<KEY, VALUE> extends Table<KEY, VALUE> {
List<LiveFileMetaData> getTableSstFiles() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber)
DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long limitCount)
throws IOException;

/**
* Gets the executor pool parallel table iterator.
*/
ThrottledThreadpoolExecutor getParallelTableIteratorPool();

/**
* Return if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public final class DBStoreBuilder {
// number in request to avoid increase in heap memory.
private long maxDbUpdatesSizeThreshold;
private Integer maxNumberOfOpenFiles = null;
private int parallelTableIteratorMaxPoolSize;

/**
* Create DBStoreBuilder from a generic DBDefinition.
Expand Down Expand Up @@ -156,8 +157,11 @@ private DBStoreBuilder(ConfigurationSource configuration,
this.maxDbUpdatesSizeThreshold = (long) configuration.getStorageSize(
OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT,
OZONE_OM_DELTA_UPDATE_DATA_SIZE_MAX_LIMIT_DEFAULT, StorageUnit.BYTES);
this.parallelTableIteratorMaxPoolSize = rocksDBConfiguration.getParallelIteratorMaxPoolSize();
}



public static File getDBDirPath(DBDefinition definition,
ConfigurationSource configuration) {
// Set metadata dirs.
Expand Down Expand Up @@ -201,6 +205,10 @@ private void setDBOptionsProps(ManagedDBOptions dbOptions) {
}
}

public void setParallelTableIteratorMaxPoolSize(int parallelTableIteratorMaxPoolSize) {
this.parallelTableIteratorMaxPoolSize = parallelTableIteratorMaxPoolSize;
}

/**
* Builds a DBStore instance and returns that.
*
Expand All @@ -227,11 +235,13 @@ public DBStore build() throws IOException {
if (!dbFile.getParentFile().exists()) {
throw new IOException("The DB destination directory should exist.");
}

if (this.parallelTableIteratorMaxPoolSize == 0) {
throw new IOException("The parallel table iterator max pool size cannot be zero.");
}
return new RDBStore(dbFile, rocksDBOption, statistics, writeOptions, tableConfigs,
registry.build(), openReadOnly, dbJmxBeanNameName, enableCompactionDag,
maxDbUpdatesSizeThreshold, createCheckpointDirs, configuration,
enableRocksDbMetrics);
enableRocksDbMetrics, this.parallelTableIteratorMaxPoolSize);
} finally {
tableConfigs.forEach(TableConfig::close);
}
Expand Down
Loading