Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,47 @@ public void testReaderGetRecordIterator() throws Exception {
}
}

@Test
public void testReaderGetRecordIteratorByKeys() throws Exception {
writeFileWithSimpleSchema();
HoodieAvroHFileReader hfileReader =
(HoodieAvroHFileReader) createReader(new Configuration());

Schema avroSchema = getSchemaFromResource(TestHoodieReaderWriterBase.class, "/exampleSchema.avsc");

List<String> keys = Collections.singletonList("key");
Iterator<IndexedRecord> iterator =
hfileReader.getIndexedRecordsByKeysIterator(keys, avroSchema);

List<GenericRecord> recordsByKeys = toStream(iterator).map(r -> (GenericRecord) r).collect(Collectors.toList());

List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator())
.map(r -> (GenericRecord) r.getData()).collect(Collectors.toList());

// no entries should match since this is exact match.
assertEquals(Collections.emptyList(), recordsByKeys);

// filter for "key00001, key05, key12, key24, key16, key2, key31, key49, key61, key50". Valid entries should be matched.
// key00001 should not match.
// even though key16 exists, its not in the sorted order of keys passed in. So, will not return the matched entry.
// key2 : we don't have an exact match
// key61 is greater than max key.
// again, by the time we reach key50, cursor is at EOF. So no entries will be returned.
List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (
(entry.get("_row_key").toString()).contains("key05")
|| (entry.get("_row_key").toString()).contains("key12")
|| (entry.get("_row_key").toString()).contains("key24")
|| (entry.get("_row_key").toString()).contains("key31")
|| (entry.get("_row_key").toString()).contains("key49"))).collect(Collectors.toList());
iterator =
hfileReader.getIndexedRecordsByKeysIterator(Arrays.asList("key00001", "key05", "key12", "key24", "key16", "key31", "key49","key61","key50"), avroSchema);
recordsByKeys =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(r -> (GenericRecord) r)
.collect(Collectors.toList());
assertEquals(expectedKey1s, recordsByKeys);
}

@Test
public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
writeFileWithSimpleSchema();
Expand Down Expand Up @@ -303,11 +344,11 @@ public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);

// filter for "key50" and "key1" : entries from key50 and 'key10 to key19' should be matched.
// filter for "key1", "key30" and "key60" : entries from 'key10 to key19' and 'key30' should be matched.
List<GenericRecord> expectedKey50and1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")
|| (entry.get("_row_key").toString()).contains("key50")).collect(Collectors.toList());
|| (entry.get("_row_key").toString()).contains("key30")).collect(Collectors.toList());
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key50", "key1"), avroSchema);
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key1", "key30","key6"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(r -> (GenericRecord)r)
Expand All @@ -318,7 +359,7 @@ public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
List<GenericRecord> expectedKey50and0s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key0")
|| (entry.get("_row_key").toString()).contains("key50")).collect(Collectors.toList());
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key50", "key0"), avroSchema);
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key0", "key50"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(r -> (GenericRecord)r)
Expand All @@ -330,7 +371,7 @@ public void testReaderGetRecordIteratorByKeyPrefixes() throws Exception {
.filter(entry -> (entry.get("_row_key").toString()).contains("key1") || (entry.get("_row_key").toString()).contains("key0"))
.collect(Collectors.toList());
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"), avroSchema);
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key0", "key1"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(r -> (GenericRecord)r)
Expand All @@ -342,6 +383,22 @@ public int compare(GenericRecord o1, GenericRecord o2) {
}
});
assertEquals(expectedKey1sand0s, recordsByPrefix);

// We expect the keys to be looked up in sorted order. If not, matching entries may not be returned.
// key1 should have matching entries, but not key0.
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"), avroSchema);
recordsByPrefix =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
.map(r -> (GenericRecord)r)
.collect(Collectors.toList());
Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() {
@Override
public int compare(GenericRecord o1, GenericRecord o2) {
return o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString());
}
});
assertEquals(expectedKey1s, recordsByPrefix);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -182,7 +180,7 @@ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] conten

// TODO abstract this w/in HoodieDataBlock
@Override
protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> sortedKeys, boolean fullKey) throws IOException {
HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get();

// NOTE: It's important to extend Hadoop configuration here to make sure configuration
Expand All @@ -195,11 +193,6 @@ protected <T> ClosableIterator<HoodieRecord<T>> lookupRecords(List<String> keys,
blockContentLoc.getContentPositionInLogFile(),
blockContentLoc.getBlockSize());

// HFile read will be efficient if keys are sorted, since on storage records are sorted by key.
// This will avoid unnecessary seeks.
List<String> sortedKeys = new ArrayList<>(keys);
Collections.sort(sortedKeys);

final HoodieAvroHFileReader reader =
new HoodieAvroHFileReader(inlineConf, inlinePath, new CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf),
Option.of(getSchemaFromHeader()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,26 +121,18 @@ public HoodieAvroHFileReader(Path path, HFile.Reader reader, Option<Schema> sche
}

@Override
public Option<HoodieRecord<IndexedRecord>> getRecordByKey(String key, Schema readerSchema) throws IOException {
synchronized (sharedScannerLock) {
return fetchRecordByKeyInternal(sharedScanner, key, getSchema(), readerSchema)
.map(data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
}

@Override
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> keys, Schema schema) throws IOException {
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator(List<String> sortedKeys, Schema schema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, keys, getSchema(), schema);
ClosableIterator<IndexedRecord> iterator = new RecordByKeyIterator(scanner, sortedKeys, getSchema(), schema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}

@Override
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema schema) throws IOException {
ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(keyPrefixes, schema);
public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes, Schema schema) throws IOException {
ClosableIterator<IndexedRecord> iterator = getIndexedRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema);
return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}

Expand Down Expand Up @@ -206,7 +198,12 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema reader
}

// TODO eval whether seeking scanner would be faster than pread
HFileScanner scanner = getHFileScanner(reader, false);
HFileScanner scanner = null;
try {
scanner = getHFileScanner(reader, false, false);
} catch (IOException e) {
throw new HoodieIOException("Instantiation HfileScanner failed for " + reader.getHFileInfo().toString());
}
return new RecordIterator(scanner, getSchema(), readerSchema);
}

Expand All @@ -220,12 +217,12 @@ protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeysIterator(List<S
}

@VisibleForTesting
protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> keyPrefixes, Schema readerSchema) throws IOException {
protected ClosableIterator<IndexedRecord> getIndexedRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes, Schema readerSchema) throws IOException {
// We're caching blocks for this scanner to minimize amount of traffic
// to the underlying storage as we fetched (potentially) sparsely distributed
// keys
HFileScanner scanner = getHFileScanner(reader, true);
return new RecordByKeyPrefixIterator(scanner, keyPrefixes, getSchema(), readerSchema);
return new RecordByKeyPrefixIterator(scanner, sortedKeyPrefixes, getSchema(), readerSchema);
}

@Override
Expand Down Expand Up @@ -261,30 +258,30 @@ private static Iterator<IndexedRecord> getRecordByKeyPrefixIteratorInternal(HFil
//
// For the HFile containing list of cells c[0], c[1], ..., c[N], `seekTo(cell)` would return
// following:
// a) -1, if cell < c[0], no position;
// a) -1, if cell < c[0], no position; but w/ prefix search, if the key to be searched is less than first entry, the cursor will be placed at the beginning.
// scanner.getCell() could return the next key which could match the key we are interested in.
// b) 0, such that c[i] = cell and scanner is left in position i;
// c) and 1, such that c[i] < cell, and scanner is left in position i.
// c) and 1, such that c[i] < cell, and scanner is left in position i. If key is somewhere in the middle, and if there is no exact match. So, scanner.next()
// is expected to return the next entry which could potentially match the prefix we are looking for. If the key being looked up is > last entry in the file,
// this could place the cursor in the end. hence we need to check for scanner.next() to ensure there are entries to read or if we reached EOF.
//
// Consider entries w/ the following keys in HFile: [key01, key02, key03, key04,..., key20];
// In case looked up key-prefix is
// - "key", `seekTo()` will return -1 and place the cursor just before "key01",
// - "key", `reseekTo()` will return -1 and place the cursor just before "key01",
// `getCell()` will return "key01" entry
// - "key03", `seekTo()` will return 0 (exact match) and place the cursor just before "key03",
// - "key03", `reseekTo()` will return 0 (exact match) and place the cursor just before "key03",
// `getCell()` will return "key03" entry
// - "key1", `seekTo()` will return 1 (first not lower than) and place the cursor just before
// "key10" (i.e. on "key09");
// - "key1", `reseekTo()` will return 1 (first not lower than) and leave the cursor at key10 (assuming there is no exact match i.e. key1)
//
int val = scanner.seekTo(kv);
// Do remember that reseek will not do any rewind. So, after searching for key05(cursor is placed at key05), if you search for key01(less than current cursor position),
// it may not return any results.
int val = scanner.reseekTo(kv);
if (val == 1) {
// Try moving to next entry, matching the prefix key; if we're at the EOF,
// `next()` will return false
if (!scanner.next()) {
return Collections.emptyIterator();
}
} else if (val == -1) {
// Whenever val == -1 HFile reader will place the pointer right before the first record. We have to advance it to the first record
// of the file to validate whether it matches our search criteria
scanner.seekTo();
}

class KeyPrefixIterator implements Iterator<IndexedRecord> {
Expand Down Expand Up @@ -333,10 +330,26 @@ public IndexedRecord next() {

private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner scanner, String key, Schema writerSchema, Schema readerSchema) throws IOException {
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
if (scanner.seekTo(kv) != 0) {
// NOTE: HFile persists both keys/values as bytes, therefore lexicographical sorted is
// essentially employed
//
// For the HFile containing list of cells c[0], c[1], ..., c[N], `seekTo(cell)` would return
// following:
// a) -1, if cell < c[0], no position; if the key to be searched is less than first entry, the cursor will be placed at the beginning.
// b) 0, such that c[i] = cell and scanner is left in position i;
// c) and 1, such that c[i] < cell, and scanner is left in position i.
// In summary, with exact match, we are interested in return value of 0. in all other cases, key is not found.
// Also, do remeber we are using reseek(), which means, the cursor will not rewind after searching for a key.
// Lets say the file contains key01, key02, .., key20.
// After searching for key09, if we search for key05, it may not return the matching entry since just after reseeking to key09, the cursor is at key09 and
// further reseek calls may not look back in positions.

if (scanner.reseekTo(kv) != 0) {
// key is not found.
return Option.empty();
}

// key is found and the cursor is left where the key is found
Cell c = scanner.getCell();
byte[] valueBytes = copyValueFromCell(c);
GenericRecord record = deserialize(key.getBytes(), valueBytes, writerSchema, readerSchema);
Expand Down Expand Up @@ -424,18 +437,26 @@ public static List<IndexedRecord> readRecords(HoodieAvroHFileReader reader,
.collect(Collectors.toList());
}

private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) {
private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks) throws IOException {
return getHFileScanner(reader, cacheBlocks, true);
}

private static HFileScanner getHFileScanner(HFile.Reader reader, boolean cacheBlocks, boolean doSeek) throws IOException {
// NOTE: Only scanners created in Positional Read ("pread") mode could share the same reader,
// since scanners in default mode will be seeking w/in the underlying stream
return reader.getScanner(cacheBlocks, true);
HFileScanner scanner = reader.getScanner(cacheBlocks, true);
if (doSeek) {
scanner.seekTo(); // places the cursor at the beginning of the first data block.
}
return scanner;
}

private static Option<Schema.Field> getKeySchema(Schema schema) {
return Option.ofNullable(schema.getField(KEY_FIELD_NAME));
}

private static class RecordByKeyPrefixIterator implements ClosableIterator<IndexedRecord> {
private final Iterator<String> keyPrefixesIterator;
private final Iterator<String> sortedKeyPrefixesIterator;
private Iterator<IndexedRecord> recordsIterator;

private final HFileScanner scanner;
Expand All @@ -445,8 +466,8 @@ private static class RecordByKeyPrefixIterator implements ClosableIterator<Index

private IndexedRecord next = null;

RecordByKeyPrefixIterator(HFileScanner scanner, List<String> keyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException {
this.keyPrefixesIterator = keyPrefixes.iterator();
RecordByKeyPrefixIterator(HFileScanner scanner, List<String> sortedKeyPrefixes, Schema writerSchema, Schema readerSchema) throws IOException {
this.sortedKeyPrefixesIterator = sortedKeyPrefixes.iterator();

this.scanner = scanner;
this.scanner.seekTo(); // position at the beginning of the file
Expand All @@ -465,8 +486,8 @@ public boolean hasNext() {
} else if (recordsIterator != null && recordsIterator.hasNext()) {
next = recordsIterator.next();
return true;
} else if (keyPrefixesIterator.hasNext()) {
String currentKeyPrefix = keyPrefixesIterator.next();
} else if (sortedKeyPrefixesIterator.hasNext()) {
String currentKeyPrefix = sortedKeyPrefixesIterator.next();
recordsIterator =
getRecordByKeyPrefixIteratorInternal(scanner, currentKeyPrefix, writerSchema, readerSchema);
} else {
Expand All @@ -492,7 +513,7 @@ public void close() {
}

private static class RecordByKeyIterator implements ClosableIterator<IndexedRecord> {
private final Iterator<String> keyIterator;
private final Iterator<String> sortedKeyIterator;

private final HFileScanner scanner;

Expand All @@ -501,8 +522,8 @@ private static class RecordByKeyIterator implements ClosableIterator<IndexedReco

private IndexedRecord next = null;

RecordByKeyIterator(HFileScanner scanner, List<String> keys, Schema writerSchema, Schema readerSchema) throws IOException {
this.keyIterator = keys.iterator();
RecordByKeyIterator(HFileScanner scanner, List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws IOException {
this.sortedKeyIterator = sortedKeys.iterator();

this.scanner = scanner;
this.scanner.seekTo(); // position at the beginning of the file
Expand All @@ -519,8 +540,8 @@ public boolean hasNext() {
return true;
}

while (keyIterator.hasNext()) {
Option<IndexedRecord> value = fetchRecordByKeyInternal(scanner, keyIterator.next(), writerSchema, readerSchema);
while (sortedKeyIterator.hasNext()) {
Option<IndexedRecord> value = fetchRecordByKeyInternal(scanner, sortedKeyIterator.next(), writerSchema, readerSchema);
if (value.isPresent()) {
next = value.get();
return true;
Expand Down
Loading