-
Notifications
You must be signed in to change notification settings - Fork 587
HDDS-12779. Parallelize table iteration across different ranges #8243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…h different ranges Change-Id: Ib08912cc23a4ff4ccc9898b082366a52dd54c363
|
@swamirishi , thanks for working on this! This is a very big change. Could you describe what you are trying to do in some details? What is the algorithm in your mind? The two-line description is way too simple to cover such a large code change. |
Change-Id: Iae11938afde4c28ac6459054a582a4ed9c920ac8
@szetszwo Thank you for starting to review the PR. I have updated the PR description with the details about the patch. |
|
|
The bulk of this change looks related to parallel iteration, not parallel deserialization and consumption. I would recommend:
|
|
FYI even in this patch we work with a rocksdb snapshot which is very cheap to take, which would just take a snapshot of the db for the iterator. This snapshot can be release whenever we are done with the entire iteration making the entire iteration over a consistent view. |
What are the assumptions of the performance improvement?
We have
Instead of having multi-thread reading files, it is better to have multi-thread processing data. Rocksdb itself is already very good for parallelism. It is unlikely Ozone could use the internal details in rocksdb to improve the performance. Also, Ozone should use only the public APIs in Rocksdb. It is hard to maintain such code. It may even causes data corruption silently. BTW, you may consider parallelizing your pull requests -- having multiple small PRs instead of having a single large PR. Then, different people can review different PRs at the same time. |
|
| public final class BigIntegerCodec implements Codec<BigInteger> { | ||
|
|
||
| private static final Codec<BigInteger> INSTANCE = new BigIntegerCodec(); | ||
| private static final Comparator<BigInteger> COMPARATOR = (o1, o2) -> Objects.compare(o1, o2, BigInteger::compareTo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swamirishi , it is hard to make sure the comparators are consistent. It failed below.
static final Codec<Object> WRAPPER = new Codec<Object>() {
@Override
public Class<Object> getTypeClass() { return null; }
@Override
public byte[] toPersistedFormat(Object object) throws IOException {
return BigIntegerCodec.get().toPersistedFormat((BigInteger) object);
}
@Override
public Object fromPersistedFormat(byte[] rawData) { return null; }
@Override
public Object copyObject(Object object) {
return object;
}
};
static final Comparator<Object> codecComparator = WRAPPER.comparator();
static final Comparator<BigInteger> bigIntegerComparator = BigIntegerCodec.get().comparator();
static void assertConsistency(BigInteger left, BigInteger right) {
final int codecDiff = codecComparator.compare(left, right);
final int bigIntegerDiff = bigIntegerComparator.compare(left, right);
System.out.println("codecDiff=" + codecDiff + ", bigIntegerDiff=" + bigIntegerDiff);
if (codecDiff == bigIntegerDiff) {
return;
}
if (codecDiff * (long) bigIntegerDiff <= 0L) {
// == 0: cannot be both 0 since they are unequal
// < 0: different signs
throw new IllegalStateException("INCONSISTENCY: codecDiff=" + codecDiff
+ " but bigIntegerDiff=" + bigIntegerDiff
+ ", where left=" + left + " and right=" + right);
}
}
public static void main(String[] args) {
assertConsistency(BigInteger.ZERO, BigInteger.ONE);
// It won't work for negative numbers !!!
assertConsistency(BigInteger.ZERO, BigInteger.ONE.negate());
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo If this is the case our prefix based iterator is broken. Since we rely on rocksdb's ordering to be same as the KeyCodec's ordering to be the same. Look at
Line 98 in 194077a
| (prefix == null || startsWithPrefix(key())); |
We break the TypedTableIterator saying raw iterator doesn't have any more entries corresponding to the typed prefix.
ozone/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
Lines 681 to 684 in e5ef35d
| public boolean hasNext() { | |
| return rawIterator.hasNext(); | |
| } | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree we don't have prefix based iterator use case for BigInteger but we have to be careful in making the claim that prefix based iterator is fool proof and would work with any defined Codec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take BigInteger as an example, the iterator always uses byte ordering but not BigInteger ordering. So, there is no problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swamirishi , yes, UTF_16LE does not work but we are not using it. So, what the point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing prefix search -- ah, you are saying that if a codec moving the bit around, it won't work. Yes, totally agree.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do similar Long comparisons for one of the recon api we are only fine because we have the container long value>0. But I am not sure if we are handling overflow cases properly (I know it is practically impossible to run out of long values) just wondering if we are handling things fine for any Integer and Long comparisons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... container long value>0 ...
@swamirishi , are we doing prefix search for container id? If not, it is fine even for negative numbers. Iterator will work fine, it just use a wired ordering but not the numerical ordering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a summary,
- prefix search requires a prefix-preserving codec.
As an analogy,
- binary search requires a sorted list.
|
Closing this PR in favour breaking into small PRs |
What changes were proposed in this pull request?
Currently most of the operations performed on the tables are agnostic to order of iteration of keys and can be executed in parallel to speed up most of such operations. The proposal here is to introduce a central functionaliy to achieve this.
The whole idea for parallelizing in this approach is to peek into the sst files metadata of rocksdb to get the keyRanges(smallestKey and largestKey) from LiveFileMetadata corresponding to the rocksdb column family and get an approximate set of range of keys in the table. Thus indirectly the table would be iterated based on the number of sst files as there are on the DB.
For instance consider the case:
1.sst has key range [k1 - k10]
2.sst has key range [k5 - k15]
3.sst has key range [k15 - k30].
Thus with the approach we would be splitting the table into 4 iterators iterating the key range:
[k1 - k5), [k5, k10), [k10, k15), [k15, k30]
Each of these iterators can be split into multiple threads and can be iterated within a threadpool task.
Similar to a pub-sub model an operation can be performed on the key which would be defined by the caller of this function.
The only con I see with this approach is that since all of these table iterators have been initialized independently then consistency would be an issue. E.g. If we are iterating through the file table where a key has been renamed from k to k' while we are iterating through the db. Depending on the iterator initialization order we could either see both k and k' while iterating or see none or see one. From what I understand we might not have use cases where we would be iterating through table ranges where there would be active mutation happening in the ranges(OM background Garbage collection, Recon bootstrap, SCM initialization etc.). Since I only see use cases where consistency is not really a key from my understanding we can live with this.
The patch also includes to have a centralized threadpool which would be responsible for throttling the number of rocksdb table iterators initialized governed by the configuration "hadoop.hdds.db.rocksdb.parallel.iterator.max.pool.size". This has been put in place as a guardrail so that this iteration doesn't impact main system performance while such background iterations are running.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-12779
How was this patch tested?
Adding unit tests.