Skip to content

Commit d74d8e2

Browse files
authored
[HUDI-1335] Introduce FlinkHoodieSimpleIndex to hudi-flink-client (#2271)
1 parent 50ff9ab commit d74d8e2

File tree

2 files changed

+145
-0
lines changed

2 files changed

+145
-0
lines changed

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.hudi.common.util.StringUtils;
3030
import org.apache.hudi.config.HoodieWriteConfig;
3131
import org.apache.hudi.exception.HoodieIndexException;
32+
import org.apache.hudi.index.simple.FlinkHoodieSimpleIndex;
3233
import org.apache.hudi.index.bloom.FlinkHoodieBloomIndex;
3334
import org.apache.hudi.index.state.FlinkInMemoryStateIndex;
3435
import org.apache.hudi.PublicAPIMethod;
@@ -61,6 +62,8 @@ public static FlinkHoodieIndex createIndex(HoodieFlinkEngineContext context, Hoo
6162
return new FlinkInMemoryStateIndex<>(context, config);
6263
case BLOOM:
6364
return new FlinkHoodieBloomIndex(config);
65+
case SIMPLE:
66+
return new FlinkHoodieSimpleIndex<>(config);
6467
default:
6568
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
6669
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.index.simple;
20+
21+
import org.apache.hudi.client.WriteStatus;
22+
import org.apache.hudi.common.engine.HoodieEngineContext;
23+
import org.apache.hudi.common.model.HoodieBaseFile;
24+
import org.apache.hudi.common.model.HoodieKey;
25+
import org.apache.hudi.common.model.HoodieRecord;
26+
import org.apache.hudi.common.model.HoodieRecordLocation;
27+
import org.apache.hudi.common.model.HoodieRecordPayload;
28+
import org.apache.hudi.common.util.Option;
29+
import org.apache.hudi.common.util.collection.Pair;
30+
import org.apache.hudi.config.HoodieWriteConfig;
31+
import org.apache.hudi.exception.HoodieIndexException;
32+
import org.apache.hudi.index.FlinkHoodieIndex;
33+
import org.apache.hudi.index.HoodieIndexUtils;
34+
import org.apache.hudi.io.HoodieKeyLocationFetchHandle;
35+
import org.apache.hudi.table.HoodieTable;
36+
37+
import avro.shaded.com.google.common.collect.Lists;
38+
39+
import java.util.HashMap;
40+
import java.util.LinkedList;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Set;
44+
import java.util.stream.Collectors;
45+
46+
import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
47+
48+
/**
49+
* A simple index which reads interested fields(record key and partition path) from base files and
50+
* compares with incoming records to find the tagged location.
51+
*
52+
* @param <T> type of payload
53+
*/
54+
public class FlinkHoodieSimpleIndex<T extends HoodieRecordPayload> extends FlinkHoodieIndex<T> {
55+
56+
public FlinkHoodieSimpleIndex(HoodieWriteConfig config) {
57+
super(config);
58+
}
59+
60+
@Override
61+
public List<WriteStatus> updateLocation(List<WriteStatus> writeStatuses, HoodieEngineContext context,
62+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
63+
return writeStatuses;
64+
}
65+
66+
@Override
67+
public boolean rollbackCommit(String instantTime) {
68+
return true;
69+
}
70+
71+
@Override
72+
public boolean isGlobal() {
73+
return false;
74+
}
75+
76+
@Override
77+
public boolean canIndexLogFiles() {
78+
return false;
79+
}
80+
81+
@Override
82+
public boolean isImplicitWithStorage() {
83+
return true;
84+
}
85+
86+
@Override
87+
public List<HoodieRecord<T>> tagLocation(List<HoodieRecord<T>> hoodieRecords, HoodieEngineContext context,
88+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) throws HoodieIndexException {
89+
return tagLocationInternal(hoodieRecords, context, hoodieTable);
90+
}
91+
92+
/**
93+
* Tags records location for incoming records.
94+
*/
95+
private List<HoodieRecord<T>> tagLocationInternal(List<HoodieRecord<T>> hoodieRecords, HoodieEngineContext context,
96+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable) {
97+
Map<HoodieKey, HoodieRecord<T>> keyedInputRecords = context.mapToPair(hoodieRecords, record -> Pair.of(record.getKey(), record), 0);
98+
Map<HoodieKey, HoodieRecordLocation> existingLocationsOnTable = fetchRecordLocationsForAffectedPartitions(keyedInputRecords.keySet(), context, hoodieTable, config.getSimpleIndexParallelism());
99+
List<HoodieRecord<T>> taggedRecords = new LinkedList<>();
100+
101+
for (Map.Entry<HoodieKey, HoodieRecord<T>> hoodieKeyHoodieRecordEntry : keyedInputRecords.entrySet()) {
102+
HoodieKey key = hoodieKeyHoodieRecordEntry.getKey();
103+
HoodieRecord<T> record = hoodieKeyHoodieRecordEntry.getValue();
104+
if (existingLocationsOnTable.containsKey(key)) {
105+
taggedRecords.add(HoodieIndexUtils.getTaggedRecord(record, Option.ofNullable(existingLocationsOnTable.get(key))));
106+
}
107+
}
108+
return taggedRecords;
109+
}
110+
111+
/**
112+
* Fetch record locations for passed in {@link HoodieKey}s.
113+
*
114+
* @param keySet {@link HoodieKey}s for which locations are fetched
115+
* @param context instance of {@link HoodieEngineContext} to use
116+
* @param hoodieTable instance of {@link HoodieTable} of interest
117+
* @param parallelism parallelism to use
118+
* @return {@link Map} of {@link HoodieKey} and {@link HoodieRecordLocation}
119+
*/
120+
private Map<HoodieKey, HoodieRecordLocation> fetchRecordLocationsForAffectedPartitions(Set<HoodieKey> keySet,
121+
HoodieEngineContext context,
122+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable,
123+
int parallelism) {
124+
List<String> affectedPartitionPathList = keySet.stream().map(HoodieKey::getPartitionPath).distinct().collect(Collectors.toList());
125+
List<Pair<String, HoodieBaseFile>> latestBaseFiles = getLatestBaseFilesForAllPartitions(affectedPartitionPathList, context, hoodieTable);
126+
return fetchRecordLocations(context, hoodieTable, parallelism, latestBaseFiles);
127+
}
128+
129+
private Map<HoodieKey, HoodieRecordLocation> fetchRecordLocations(HoodieEngineContext context,
130+
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> hoodieTable,
131+
int parallelism,
132+
List<Pair<String, HoodieBaseFile>> latestBaseFiles) {
133+
134+
List<HoodieKeyLocationFetchHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>> hoodieKeyLocationFetchHandles =
135+
context.map(latestBaseFiles, partitionPathBaseFile -> new HoodieKeyLocationFetchHandle<>(config, hoodieTable, partitionPathBaseFile), parallelism);
136+
Map<HoodieKey, HoodieRecordLocation> recordLocations = new HashMap<>();
137+
hoodieKeyLocationFetchHandles.stream()
138+
.flatMap(handle -> Lists.newArrayList(handle.locations()).stream())
139+
.forEach(x -> x.forEach(y -> recordLocations.put(y.getKey(), y.getRight())));
140+
return recordLocations;
141+
}
142+
}

0 commit comments

Comments
 (0)