Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,14 @@ protected List<HoodieRecord<T>> tag(List<HoodieRecord<T>> dedupedRecords, Hoodie
@Override
public List<HoodieRecord<T>> deduplicateRecords(
List<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
final boolean hasInsert = records.get(0).getCurrentLocation().getInstantTime().equals("I");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about renaming this as isInsertBucket and add a comment to explain why we need this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The keyedRecords can be made more efficient:

Map<Object, List<HoodieRecord<T>>> keyedRecords = records.stream()
        .collect(Collectors.groupingBy(record -> record.getKey().getRecordKey()))

Map<Object, List<Pair<Object, HoodieRecord<T>>>> keyedRecords = records.stream().map(record -> {
// If index used is global, then records are expected to differ in their partitionPath
final Object key = record.getKey().getRecordKey();
return Pair.of(key, record);
}).collect(Collectors.groupingBy(Pair::getLeft));

return keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
List<HoodieRecord<T>> recordList = keyedRecords.values().stream().map(x -> x.stream().map(Pair::getRight).reduce((rec1, rec2) -> {
final T data1 = rec1.getData();
final T data2 = rec2.getData();

Expand All @@ -113,5 +114,10 @@ public List<HoodieRecord<T>> deduplicateRecords(
hoodieRecord.setCurrentLocation(rec1.getCurrentLocation());
return hoodieRecord;
}).orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());

if (hasInsert) {
recordList.get(0).getCurrentLocation().setInstantTime("I");
}
return recordList;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line 114, we already reset the location, so each records list under the same key after reduction should have the same instant time type as before, so why the set is needed ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote a test in local and found out the order of the list was changed after reduction. <id1, id2> became <id2,id1> somehow, so it's not related to a single record.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the Map::values does not guarantee the sequence, state index based writer has no problem because it assigns the instant "I" and "U" based on the buckets of last checkpoint, and reuse the buckets within one checkpoint.

This fix is necessary for it to be more robust.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,65 @@ public void testMergeOnReadWriteWithCompaction(String indexType) throws Exceptio
TestData.checkWrittenFullData(tempFile, EXPECTED);
}

@ParameterizedTest
@ValueSource(strings = {"BUCKET"})
public void testCopyOnWriteBucketIndex(String indexType) throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use this test for the COW table? include state index as well

int parallelism = 4;
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setString(FlinkOptions.INDEX_TYPE, indexType);
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 1);
conf.setString(FlinkOptions.INDEX_KEY_FIELD, "id");
conf.setBoolean(FlinkOptions.PRE_COMBINE,true);
conf.setString(FlinkOptions.TABLE_TYPE, HoodieTableType.COPY_ON_WRITE.name());
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
execEnv.getConfig().disableObjectReuse();
execEnv.setParallelism(parallelism);
// set up checkpoint interval
execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// Read from file source
RowType rowType =
(RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
false,
true,
TimestampFormat.ISO_8601
);
String sourcePath = Objects.requireNonNull(Thread.currentThread()
.getContextClassLoader().getResource("test_source.data")).toString();

TextInputFormat format = new TextInputFormat(new Path(sourcePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName("UTF-8");

DataStream<RowData> dataStream = execEnv
// use PROCESS_CONTINUOUSLY mode to trigger checkpoint
.readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)))
.setParallelism(parallelism);

DataStream<HoodieRecord> hoodieRecordDataStream = Pipelines.bootstrap(conf, rowType, parallelism, dataStream);
DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf, parallelism, hoodieRecordDataStream);
Pipelines.clean(conf, pipeline);
JobClient client = execEnv.executeAsync(execEnv.getStreamGraph());
if (client.getJobStatus().get() != JobStatus.FAILED) {
try {
TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this sleep still needed if we test for COW?

client.cancel();
} catch (Throwable var1) {
// ignored
}
}

TestData.checkWrittenFullData(tempFile, EXPECTED);
}

private void testWriteToHoodie(
Transformer transformer,
Map<String, List<String>> expected) throws Exception {
Expand Down