From 71859eef948188566ff065ebcfe1a8da92f5109e Mon Sep 17 00:00:00 2001 From: voonhous Date: Wed, 14 Sep 2022 16:02:58 +0800 Subject: [PATCH] [HUDI-4841] Fix sort idempotency issue --- .../format/cow/CopyOnWriteInputFormat.java | 11 ++-- .../format/cow/TestBlockLocationSort.java | 52 +++++++++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index f04c23fe91e44..0300889becccf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format.cow; +import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.hudi.util.DataTypeUtils; @@ -42,7 +43,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Comparator; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -214,13 +214,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { // get the block locations and make sure they are in order with respect to their offset final BlockLocation[] blocks = fs.getFileBlockLocations(file, 0, len); - Arrays.sort(blocks, new Comparator() { - @Override - public int compare(BlockLocation o1, BlockLocation o2) { - long diff = o1.getLength() - o2.getOffset(); - return Long.compare(diff, 0L); - } - }); + Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset)); long bytesUnassigned = len; long position = 0; @@ -393,4 +387,5 @@ private InflaterInputStreamFactory getInflaterInputStreamFactory(org.apache.h return null; } } + } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java new file mode 100644 index 0000000000000..d868dce4d9153 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/cow/TestBlockLocationSort.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.format.cow; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +import java.util.Arrays; +import java.util.Comparator; +import org.apache.hadoop.fs.BlockLocation; +import org.junit.jupiter.api.Test; + +public class TestBlockLocationSort { + + private static BlockLocation createBlockLocation(int offset, int length) { + return new BlockLocation(new String[0], new String[0], offset, length); + } + + @Test + void testBlockLocationSort() { + BlockLocation o1 = createBlockLocation(0, 5); + BlockLocation o2 = createBlockLocation(6, 4); + BlockLocation o3 = createBlockLocation(5, 5); + + BlockLocation[] blocks = {o1, o2, o3}; + BlockLocation[] sortedBlocks = {o1, o3, o2}; + + Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset)); + assertThat(blocks, equalTo(sortedBlocks)); + + // Sort again to ensure idempotency + Arrays.sort(blocks, Comparator.comparingLong(BlockLocation::getOffset)); + assertThat(blocks, equalTo(sortedBlocks)); + } + +}