diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index c93907c4a33bf..5fcd9dfd60be4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -22,10 +22,12 @@ import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.exception.HoodieException; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.Locale; import java.util.Properties; /** @@ -40,6 +42,9 @@ public class HoodieClusteringConfig extends HoodieConfig { // Any strategy specific params can be saved with this prefix public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy."; + // Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix + public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize."; + public static final ConfigProperty DAYBASED_LOOKBACK_PARTITIONS = ConfigProperty .key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions") .defaultValue("2") @@ -137,6 +142,55 @@ public class HoodieClusteringConfig extends HoodieConfig { .sinceVersion("0.9.0") .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); + public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty + .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable") + .defaultValue(false) + .sinceVersion("0.10.0") + .withDocumentation("Enable use z-ordering/space-filling curves to optimize the layout of table to boost query performance. " + + "This parameter takes precedence over clustering strategy set using " + EXECUTION_STRATEGY_CLASS_NAME.key()); + + public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty + .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "strategy") + .defaultValue("z-order") + .sinceVersion("0.10.0") + .withDocumentation("Type of layout optimization to be applied, current only supports `z-order` and `hilbert` curves."); + + /** + * There exists two method to build z-curve. + * one is directly mapping sort cols to z-value to build z-curve; + * we can find this method in Amazon DynamoDB https://aws.amazon.com/cn/blogs/database/tag/z-order/ + * the other one is Boundary-based Interleaved Index method which we proposed. simply call it sample method. + * Refer to rfc-28 for specific algorithm flow. + * Boundary-based Interleaved Index method has better generalization, but the build speed is slower than direct method. + */ + public static final ConfigProperty LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD = ConfigProperty + .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "curve.build.method") + .defaultValue("direct") + .sinceVersion("0.10.0") + .withDocumentation("Controls how data is sampled to build the space filling curves. two methods: `direct`,`sample`." + + "The direct method is faster than the sampling, however sample method would produce a better data layout."); + /** + * Doing sample for table data is the first step in Boundary-based Interleaved Index method. + * larger sample number means better optimize result, but more memory consumption + */ + public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty + .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "build.curve.sample.size") + .defaultValue("200000") + .sinceVersion("0.10.0") + .withDocumentation("when setting" + LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD.key() + " to `sample`, the amount of sampling to be done." + + "Large sample size leads to better results, at the expense of more memory usage."); + + /** + * The best way to use Z-order/Space-filling curves is to cooperate with Data-Skipping + * with data-skipping query engine can greatly reduce the number of table files to be read. + * otherwise query engine can only do row-group skipping for files (parquet/orc) + */ + public static final ConfigProperty LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE = ConfigProperty + .key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "data.skipping.enable") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete."); + /** * @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead */ @@ -350,9 +404,58 @@ public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMeta return this; } + public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) { + clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable)); + return this; + } + + public Builder withDataOptimizeStrategy(String strategy) { + clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy); + return this; + } + + public Builder withDataOptimizeBuildCurveStrategy(String method) { + clusteringConfig.setValue(LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD, method); + return this; + } + + public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) { + clusteringConfig.setValue(LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE, String.valueOf(sampleNumber)); + return this; + } + + public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) { + clusteringConfig.setValue(LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE, String.valueOf(dataSkipping)); + return this; + } + public HoodieClusteringConfig build() { clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName()); return clusteringConfig; } } + + /** + * strategy types for build z-ordering/space-filling curves. + */ + public enum BuildCurveStrategyType { + DIRECT("direct"), + SAMPLE("sample"); + private final String value; + + BuildCurveStrategyType(String value) { + this.value = value; + } + + public static BuildCurveStrategyType fromValue(String value) { + switch (value.toLowerCase(Locale.ROOT)) { + case "direct": + return DIRECT; + case "sample": + return SAMPLE; + default: + throw new HoodieException("Invalid value of Type."); + } + } + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c9d8c4f117eaf..aeb77db187dfe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1228,6 +1228,30 @@ public String getClusteringSortColumns() { return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS); } + /** + * Data layout optimize properties. + */ + public boolean isLayoutOptimizationEnabled() { + return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE); + } + + public String getLayoutOptimizationStrategy() { + return getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY); + } + + public HoodieClusteringConfig.BuildCurveStrategyType getLayoutOptimizationCurveBuildMethod() { + return HoodieClusteringConfig.BuildCurveStrategyType.fromValue( + getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD)); + } + + public int getLayoutOptimizationSampleSize() { + return getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE); + } + + public boolean isDataSkippingEnabled() { + return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE); + } + /** * index properties. */ @@ -1776,6 +1800,7 @@ public static class Builder { private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; private boolean isClusteringConfigSet = false; + private boolean isOptimizeConfigSet = false; private boolean isMetricsConfigSet = false; private boolean isBootstrapConfigSet = false; private boolean isMemoryConfigSet = false; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java new file mode 100644 index 0000000000000..3aa808075d330 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/optimize/ZOrderingUtil.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.optimize; + +import java.nio.charset.Charset; + +public class ZOrderingUtil { + + /** + * Lexicographically compare two arrays. + * copy from hbase + * @param buffer1 left operand + * @param buffer2 right operand + * @param offset1 Where to start comparing in the left buffer + * @param offset2 Where to start comparing in the right buffer + * @param length1 How much to compare from the left buffer + * @param length2 How much to compare from the right buffer + * @return 0 if equal, < 0 if left is less than right, etc. + */ + public static int compareTo(byte[] buffer1, int offset1, int length1, + byte[] buffer2, int offset2, int length2) { + // Short circuit equal case + if (buffer1 == buffer2 + && offset1 == offset2 + && length1 == length2) { + return 0; + } + // Bring WritableComparator code local + int end1 = offset1 + length1; + int end2 = offset2 + length2; + for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) { + int a = (buffer1[i] & 0xff); + int b = (buffer2[j] & 0xff); + if (a != b) { + return a - b; + } + } + return length1 - length2; + } + + public static byte[] paddingTo8Byte(byte[] a) { + if (a.length == 8) { + return a; + } + if (a.length > 8) { + byte[] result = new byte[8]; + System.arraycopy(a, 0, result, 0, 8); + return result; + } + int paddingSize = 8 - a.length; + byte[] result = new byte[8]; + for (int i = 0; i < paddingSize; i++) { + result[i] = 0; + } + System.arraycopy(a, 0, result, paddingSize, a.length); + + return result; + } + + /** + * Interleaving array bytes. + * Interleaving means take one bit from the first matrix element, one bit + * from the next, etc, then take the second bit from the first matrix + * element, second bit from the second, all the way to the last bit of the + * last element. Combine those bits in that order into a single BigInteger, + * @param buffer candidate element to do interleaving + * @return byte size of candidate element + */ + public static byte[] interleaving(byte[][] buffer, int size) { + int candidateSize = buffer.length; + byte[] result = new byte[size * candidateSize]; + int resBitPos = 0; + int totalBits = size * 8; + for (int bitStep = 0; bitStep < totalBits; bitStep++) { + int currentBytePos = (int) Math.floor(bitStep / 8); + int currentBitPos = bitStep % 8; + + for (int i = 0; i < candidateSize; i++) { + int tempResBytePos = (int) Math.floor(resBitPos / 8); + int tempResBitPos = resBitPos % 8; + result[tempResBytePos] = updatePos(result[tempResBytePos], tempResBitPos, buffer[i][currentBytePos], currentBitPos); + resBitPos++; + } + } + return result; + } + + public static byte updatePos(byte a, int apos, byte b, int bpos) { + byte temp = (byte) (b & (1 << (7 - bpos))); + if (apos < bpos) { + temp = (byte) (temp << (bpos - apos)); + } + if (apos > bpos) { + temp = (byte) (temp >> (apos - bpos)); + } + byte atemp = (byte) (a & (1 << (7 - apos))); + if ((byte) (atemp ^ temp) == 0) { + return a; + } + return (byte) (a ^ (1 << (7 - apos))); + } + + public static byte[] toBytes(int val) { + byte[] b = new byte[4]; + for (int i = 3; i > 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + b[0] = (byte) val; + return b; + } + + public static byte[] toBytes(long val) { + long temp = val; + byte[] b = new byte[8]; + for (int i = 7; i > 0; i--) { + b[i] = (byte) temp; + temp >>>= 8; + } + b[0] = (byte) temp; + return b; + } + + public static byte[] toBytes(final double d) { + return toBytes(Double.doubleToRawLongBits(d)); + } + + public static byte[] intTo8Byte(int a) { + int temp = a; + temp = temp ^ (1 << 31); + return paddingTo8Byte(toBytes(temp)); + } + + public static byte[] byteTo8Byte(byte a) { + return paddingTo8Byte(new byte[] { a }); + } + + public static byte[] longTo8Byte(long a) { + long temp = a; + temp = temp ^ (1L << 63); + return toBytes(temp); + } + + public static byte[] doubleTo8Byte(double a) { + byte[] temp = toBytes(a); + if (a > 0) { + temp[0] = (byte) (temp[0] ^ (1 << 7)); + } + if (a < 0) { + for (int i = 0; i < temp.length; i++) { + temp[i] = (byte) ~temp[i]; + } + } + return temp; + } + + public static byte[] utf8To8Byte(String a) { + return paddingTo8Byte(a.getBytes(Charset.forName("utf-8"))); + } + + public static Long convertStringToLong(String a) { + byte[] bytes = utf8To8Byte(a); + long temp = 0L; + for (int i = 7; i >= 0; i--) { + temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8); + } + return temp; + } +} + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a6c14e6d2aea3..4b582b1d53674 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -244,6 +244,16 @@ public abstract HoodieWriteMetadata bulkInsertPrepped(HoodieEngineContext con */ public abstract HoodieWriteMetadata insertOverwriteTable(HoodieEngineContext context, String instantTime, I records); + /** + * update statistics info for current table. + * to do adaptation, once RFC-27 is finished. + * + * @param context HoodieEngineContext + * @param instantTime Instant time for the replace action + * @param isOptimizeOperation whether current operation is OPTIMIZE type + */ + public abstract void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation); + public HoodieWriteConfig getConfig() { return config; } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java new file mode 100644 index 0000000000000..7dab6c2057c77 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/optimize/TestZOrderingUtil.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.optimize; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestZOrderingUtil { + + @Test + public void testIntConvert() { + // test Int + int[] testInt = new int[] {-1, 1, -2, 10000, -100000, 2, Integer.MAX_VALUE, Integer.MIN_VALUE}; + List> valueWrappers = new ArrayList<>(); + List> convertResultWrappers = new ArrayList<>(); + for (int i = 0; i < testInt.length; i++) { + valueWrappers.add(new OrginValueWrapper<>(i, testInt[i])); + convertResultWrappers.add(new ConvertResultWrapper<>(i, ZOrderingUtil.intTo8Byte(testInt[i]))); + } + + Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); + + Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + + for (int i = 0; i < testInt.length; i++) { + assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); + } + } + + @Test + public void testLongConvert() { + // test Long + long[] testLong = new long[] {-1L, 1L, -2L, 10000L, -100000L, 2L, Long.MAX_VALUE, Long.MIN_VALUE}; + List> valueWrappers = new ArrayList<>(); + List> convertResultWrappers = new ArrayList<>(); + for (int i = 0; i < testLong.length; i++) { + valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i])); + convertResultWrappers.add(new ConvertResultWrapper<>((long)i, ZOrderingUtil.longTo8Byte(testLong[i]))); + } + + Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); + + Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + + for (int i = 0; i < testLong.length; i++) { + assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); + } + } + + @Test + public void testDoubleConvert() { + // test Long + double[] testDouble = new double[] {-1.00d, 1.05d, -2.3d, 10000.002d, -100000.7d, 2.9d, Double.MAX_VALUE}; + List> valueWrappers = new ArrayList<>(); + List> convertResultWrappers = new ArrayList<>(); + for (int i = 0; i < testDouble.length; i++) { + valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i])); + convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), ZOrderingUtil.doubleTo8Byte(testDouble[i]))); + } + + Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); + + Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + + for (int i = 0; i < testDouble.length; i++) { + assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); + } + } + + @Test + public void testFloatConvert() { + // test Long + float[] testDouble = new float[] {-1.00f, 1.05f, -2.3f, 10000.002f, -100000.7f, 2.9f, Float.MAX_VALUE, Float.MIN_VALUE}; + List> valueWrappers = new ArrayList<>(); + List> convertResultWrappers = new ArrayList<>(); + for (int i = 0; i < testDouble.length; i++) { + valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i])); + convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), ZOrderingUtil.doubleTo8Byte((double) testDouble[i]))); + } + + Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); + + Collections.sort(convertResultWrappers, ((o1, o2) -> ZOrderingUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + + for (int i = 0; i < testDouble.length; i++) { + assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); + } + } + + private class ConvertResultWrapper { + T index; + byte[] result; + public ConvertResultWrapper(T index, byte[] result) { + this.index = index; + this.result = result; + } + } + + private class OrginValueWrapper { + T index; + T originValue; + public OrginValueWrapper(T index, T originValue) { + this.index = index; + this.originValue = originValue; + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index ae0ced2c819ff..8c2089963717c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; @@ -232,6 +233,11 @@ public HoodieWriteMetadata deletePartitions(HoodieEngineContext context, String throw new HoodieNotSupportedException("DeletePartitions is not supported yet"); } + @Override + public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { + throw new HoodieNotSupportedException("update statistics is not supported yet"); + } + @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, List> preppedRecords) { throw new HoodieNotSupportedException("This method should not be invoked"); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 9d96ca1de99c4..ba3af89418051 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; @@ -144,6 +145,11 @@ public HoodieWriteMetadata> insertOverwriteTable(HoodieEngineC context, config, this, instantTime, records).execute(); } + @Override + public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { + throw new HoodieNotSupportedException("update statistics is not supported yet"); + } + @Override public Option scheduleCompaction(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 4100b0463e026..173276d984df0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -380,6 +380,10 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD performClusteringWithRecordsRDD(final JavaRDD> getPartitioner(Map strategyParams, Schema schema) { - if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { + if (getWriteConfig().isLayoutOptimizationEnabled()) { + // sort input records by z-order/hilbert + return Option.of(new RDDSpatialCurveOptimizationSortPartitioner((HoodieSparkEngineContext) getEngineContext(), + getWriteConfig(), HoodieAvroUtils.addMetadataFields(schema))); + } else if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) { return Option.of(new RDDCustomColumnsSortPartitioner(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","), HoodieAvroUtils.addMetadataFields(schema))); } else { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java new file mode 100644 index 0000000000000..fa12159eeac62 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveOptimizationSortPartitioner.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.SerializableSchema; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.BulkInsertPartitioner; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.spark.ZCurveOptimizeHelper; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +/** + * A partitioner that does spartial curve optimization sorting based on specified column values for each RDD partition. + * support z-curve optimization, hilbert will come soon. + * @param HoodieRecordPayload type + */ +public class RDDSpatialCurveOptimizationSortPartitioner + implements BulkInsertPartitioner>> { + private final HoodieSparkEngineContext sparkEngineContext; + private final SerializableSchema serializableSchema; + private final HoodieWriteConfig config; + + public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext sparkEngineContext, HoodieWriteConfig config, Schema schema) { + this.sparkEngineContext = sparkEngineContext; + this.config = config; + this.serializableSchema = new SerializableSchema(schema); + } + + @Override + public JavaRDD> repartitionRecords(JavaRDD> records, int outputSparkPartitions) { + String payloadClass = config.getPayloadClass(); + // do sort + JavaRDD preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get()); + return preparedRecord.map(record -> { + String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(); + HoodieKey hoodieKey = new HoodieKey(key, partition); + HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass, + new Object[] {Option.of(record)}, Option.class); + HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload); + return hoodieRecord; + }); + } + + private JavaRDD prepareGenericRecord(JavaRDD> inputRecords, final int numOutputGroups, final Schema schema) { + SerializableSchema serializableSchema = new SerializableSchema(schema); + JavaRDD genericRecordJavaRDD = inputRecords.map(f -> (GenericRecord) f.getData().getInsertValue(serializableSchema.get()).get()); + Dataset originDF = AvroConversionUtils.createDataFrame(genericRecordJavaRDD.rdd(), schema.toString(), sparkEngineContext.getSqlContext().sparkSession()); + Dataset zDataFrame; + + switch (config.getLayoutOptimizationCurveBuildMethod()) { + case DIRECT: + zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameByMapValue(originDF, config.getClusteringSortColumns(), numOutputGroups); + break; + case SAMPLE: + zDataFrame = ZCurveOptimizeHelper.createZIndexedDataFrameBySample(originDF, config.getClusteringSortColumns(), numOutputGroups); + break; + default: + throw new HoodieException("Not a valid build curve method for doWriteOperation: "); + } + return HoodieSparkUtils.createRdd(zDataFrame, schema.getName(), + schema.getNamespace(), false, org.apache.hudi.common.util.Option.empty()).toJavaRDD(); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index e458d845a817f..8e4471010f9c0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -18,6 +18,7 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -34,6 +35,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -71,13 +73,16 @@ import org.apache.hudi.table.action.savepoint.SavepointActionExecutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.ZCurveOptimizeHelper; import org.apache.spark.api.java.JavaRDD; +import scala.collection.JavaConversions; import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with @@ -152,6 +157,32 @@ public HoodieWriteMetadata> insertOverwriteTable(HoodieEngi return new SparkInsertOverwriteTableCommitActionExecutor(context, config, this, instantTime, records).execute(); } + @Override + public void updateStatistics(HoodieEngineContext context, List stats, String instantTime, Boolean isOptimizeOperation) { + // deal with z-order/hilbert statistic info + if (isOptimizeOperation) { + updateOptimizeOperationStatistics(context, stats, instantTime); + } + } + + private void updateOptimizeOperationStatistics(HoodieEngineContext context, List stats, String instantTime) { + String cols = config.getClusteringSortColumns(); + String basePath = metaClient.getBasePath(); + String indexPath = metaClient.getZindexPath(); + List validateCommits = metaClient.getCommitsTimeline() + .filterCompletedInstants().getInstants().map(f -> f.getTimestamp()).collect(Collectors.toList()); + List touchFiles = stats.stream().map(s -> new Path(basePath, s.getPath()).toString()).collect(Collectors.toList()); + if (touchFiles.isEmpty() || cols.isEmpty() || indexPath.isEmpty()) { + LOG.warn("save nothing to index table"); + return; + } + HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)context; + ZCurveOptimizeHelper.saveStatisticsInfo(sparkEngineContext + .getSqlContext().sparkSession().read().load(JavaConversions.asScalaBuffer(touchFiles)), + cols, indexPath, instantTime, validateCommits); + LOG.info(String.format("save statistic info sucessfully at commitTime: %s", instantTime)); + } + @Override public Option scheduleCompaction(HoodieEngineContext context, String instantTime, Option> extraMetadata) { throw new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table"); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java index 322d19194ae81..9013901c9a2ee 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBulkInsertHelper.java @@ -69,7 +69,7 @@ public HoodieWriteMetadata> bulkInsert(final JavaRDD writeStatuses = bulkInsert(inputRecords, instantTime, table, config, performDedupe, userDefinedBulkInsertPartitioner, false, config.getBulkInsertShuffleParallelism(), false); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java new file mode 100644 index 0000000000000..7ba1c9465bfd0 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/spark/ZCurveOptimizeHelper.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import scala.collection.JavaConversions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.HoodieSparkUtils$; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.optimize.ZOrderingUtil; +import org.apache.parquet.io.api.Binary; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.Row$; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.hudi.execution.RangeSampleSort$; +import org.apache.spark.sql.hudi.execution.ZorderingBinarySort; +import org.apache.spark.sql.types.BinaryType; +import org.apache.spark.sql.types.BinaryType$; +import org.apache.spark.sql.types.BooleanType; +import org.apache.spark.sql.types.ByteType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DateType; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.DoubleType; +import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.LongType$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.ShortType; +import org.apache.spark.sql.types.StringType; +import org.apache.spark.sql.types.StringType$; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType$; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.util.SerializableConfiguration; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ZCurveOptimizeHelper { + + private static final String SPARK_JOB_DESCRIPTION = "spark.job.description"; + + /** + * Create z-order DataFrame directly + * first, map all base type data to byte[8], then create z-order DataFrame + * only support base type data. long,int,short,double,float,string,timestamp,decimal,date,byte + * this method is more effective than createZIndexDataFrameBySample + * + * @param df a spark DataFrame holds parquet files to be read. + * @param zCols z-sort cols + * @param fileNum spark partition num + * @return a dataFrame sorted by z-order. + */ + public static Dataset createZIndexedDataFrameByMapValue(Dataset df, List zCols, int fileNum) { + Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e)); + int fieldNum = df.schema().fields().length; + List checkCols = zCols.stream().filter(f -> columnsMap.containsKey(f)).collect(Collectors.toList()); + if (zCols.size() != checkCols.size()) { + return df; + } + // only one col to sort, no need to use z-order + if (zCols.size() == 1) { + return df.repartitionByRange(fieldNum, org.apache.spark.sql.functions.col(zCols.get(0))); + } + Map fieldMap = zCols + .stream().collect(Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), e -> columnsMap.get(e))); + // z-sort + JavaRDD sortedRdd = df.toJavaRDD().map(row -> { + List zBytesList = fieldMap.entrySet().stream().map(entry -> { + int index = entry.getKey(); + StructField field = entry.getValue(); + DataType dataType = field.dataType(); + if (dataType instanceof LongType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + } else if (dataType instanceof DoubleType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + } else if (dataType instanceof IntegerType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + } else if (dataType instanceof FloatType) { + return ZOrderingUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + } else if (dataType instanceof StringType) { + return ZOrderingUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + } else if (dataType instanceof DateType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + } else if (dataType instanceof TimestampType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + } else if (dataType instanceof ByteType) { + return ZOrderingUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + } else if (dataType instanceof ShortType) { + return ZOrderingUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + } else if (dataType instanceof DecimalType) { + return ZOrderingUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + } else if (dataType instanceof BooleanType) { + boolean value = row.isNullAt(index) ? false : row.getBoolean(index); + return ZOrderingUtil.intTo8Byte(value ? 1 : 0); + } else if (dataType instanceof BinaryType) { + return ZOrderingUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + } + return null; + }).filter(f -> f != null).collect(Collectors.toList()); + byte[][] zBytes = new byte[zBytesList.size()][]; + for (int i = 0; i < zBytesList.size(); i++) { + zBytes[i] = zBytesList.get(i); + } + List zVaules = new ArrayList<>(); + zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava()); + zVaules.add(ZOrderingUtil.interleaving(zBytes, 8)); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules)); + }).sortBy(f -> new ZorderingBinarySort((byte[]) f.get(fieldNum)), true, fileNum); + + // create new StructType + List newFields = new ArrayList<>(); + newFields.addAll(Arrays.asList(df.schema().fields())); + newFields.add(new StructField("zIndex", BinaryType$.MODULE$, true, Metadata.empty())); + + // create new DataFrame + return df.sparkSession().createDataFrame(sortedRdd, StructType$.MODULE$.apply(newFields)).drop("zIndex"); + } + + public static Dataset createZIndexedDataFrameByMapValue(Dataset df, String zCols, int fileNum) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createZIndexedDataFrameByMapValue(df, + Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); + } + + public static Dataset createZIndexedDataFrameBySample(Dataset df, List zCols, int fileNum) { + return RangeSampleSort$.MODULE$.sortDataFrameBySample(df, JavaConversions.asScalaBuffer(zCols), fileNum); + } + + public static Dataset createZIndexedDataFrameBySample(Dataset df, String zCols, int fileNum) { + if (zCols == null || zCols.isEmpty() || fileNum <= 0) { + return df; + } + return createZIndexedDataFrameBySample(df, Arrays.stream(zCols.split(",")).map(f -> f.trim()).collect(Collectors.toList()), fileNum); + } + + /** + * Parse min/max statistics stored in parquet footers for z-sort cols. + * no support collect statistics from timeStampType, since parquet file has not collect the statistics for timeStampType. + * to do adapt for rfc-27 + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols + * @return a dataFrame holds all statistics info. + */ + public static Dataset getMinMaxValue(Dataset df, List cols) { + Map columnsMap = Arrays.stream(df.schema().fields()).collect(Collectors.toMap(e -> e.name(), e -> e.dataType())); + + List scanFiles = Arrays.asList(df.inputFiles()); + SparkContext sc = df.sparkSession().sparkContext(); + JavaSparkContext jsc = new JavaSparkContext(sc); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration(sc.hadoopConfiguration()); + int numParallelism = (scanFiles.size() / 3 + 1); + List> colMinMaxInfos = new ArrayList<>(); + String previousJobDescription = sc.getLocalProperty(SPARK_JOB_DESCRIPTION); + try { + String description = "Listing parquet column statistics"; + jsc.setJobDescription(description); + colMinMaxInfos = jsc.parallelize(scanFiles, numParallelism).mapPartitions(paths -> { + Configuration conf = serializableConfiguration.value(); + ParquetUtils parquetUtils = (ParquetUtils) BaseFileUtils.getInstance(HoodieFileFormat.PARQUET); + List>> results = new ArrayList<>(); + while (paths.hasNext()) { + String path = paths.next(); + results.add(parquetUtils.readRangeFromParquetMetadata(conf, new Path(path), cols)); + } + return results.stream().flatMap(f -> f.stream()).iterator(); + }).collect(); + } finally { + jsc.setJobDescription(previousJobDescription); + } + + Map>> fileToStatsListMap = colMinMaxInfos.stream().collect(Collectors.groupingBy(e -> e.getFilePath())); + JavaRDD allMetaDataRDD = jsc.parallelize(fileToStatsListMap.values().stream().collect(Collectors.toList()), 1).map(f -> { + int colSize = f.size(); + if (colSize == 0) { + return null; + } else { + List rows = new ArrayList<>(); + rows.add(f.get(0).getFilePath()); + cols.stream().forEach(col -> { + HoodieColumnRangeMetadata currentColRangeMetaData = + f.stream().filter(s -> s.getColumnName().trim().equalsIgnoreCase(col)).findFirst().orElse(null); + DataType colType = columnsMap.get(col); + if (currentColRangeMetaData == null || colType == null) { + throw new HoodieException(String.format("cannot collect min/max statistics for col: %s", col)); + } + if (colType instanceof IntegerType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof DoubleType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof StringType) { + String minString = new String(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); + String maxString = new String(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); + rows.add(minString); + rows.add(maxString); + } else if (colType instanceof DecimalType) { + Double minDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMinValue().toString()))); + Double maxDecimal = Double.parseDouble(currentColRangeMetaData.getStringifier().stringify(Long.valueOf(currentColRangeMetaData.getMaxValue().toString()))); + rows.add(BigDecimal.valueOf(minDecimal)); + rows.add(BigDecimal.valueOf(maxDecimal)); + } else if (colType instanceof DateType) { + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMinValue()))); + rows.add(java.sql.Date.valueOf(currentColRangeMetaData.getStringifier().stringify((int)currentColRangeMetaData.getMaxValue()))); + } else if (colType instanceof LongType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ShortType) { + rows.add(Short.parseShort(currentColRangeMetaData.getMinValue().toString())); + rows.add(Short.parseShort(currentColRangeMetaData.getMaxValue().toString())); + } else if (colType instanceof FloatType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof BinaryType) { + rows.add(((Binary)currentColRangeMetaData.getMinValue()).getBytes()); + rows.add(((Binary)currentColRangeMetaData.getMaxValue()).getBytes()); + } else if (colType instanceof BooleanType) { + rows.add(currentColRangeMetaData.getMinValue()); + rows.add(currentColRangeMetaData.getMaxValue()); + } else if (colType instanceof ByteType) { + rows.add(Byte.valueOf(currentColRangeMetaData.getMinValue().toString())); + rows.add(Byte.valueOf(currentColRangeMetaData.getMaxValue().toString())); + } else { + throw new HoodieException(String.format("Not support type: %s", colType)); + } + rows.add(currentColRangeMetaData.getNumNulls()); + }); + return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(rows)); + } + }).filter(f -> f != null); + List allMetaDataSchema = new ArrayList<>(); + allMetaDataSchema.add(new StructField("file", StringType$.MODULE$, true, Metadata.empty())); + cols.forEach(col -> { + allMetaDataSchema.add(new StructField(col + "_minValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_maxValue", columnsMap.get(col), true, Metadata.empty())); + allMetaDataSchema.add(new StructField(col + "_num_nulls", LongType$.MODULE$, true, Metadata.empty())); + }); + return df.sparkSession().createDataFrame(allMetaDataRDD, StructType$.MODULE$.apply(allMetaDataSchema)); + } + + public static Dataset getMinMaxValue(Dataset df, String cols) { + List rawCols = Arrays.asList(cols.split(",")).stream().map(f -> f.trim()).collect(Collectors.toList()); + return getMinMaxValue(df, rawCols); + } + + /** + * Update statistics info. + * this method will update old index table by full out join, + * and save the updated table into a new index table based on commitTime. + * old index table will be cleaned also. + * + * @param df a spark DataFrame holds parquet files to be read. + * @param cols z-sort cols. + * @param indexPath index store path. + * @param commitTime current operation commitTime. + * @param validateCommits all validate commits for current table. + * @return + */ + public static void saveStatisticsInfo(Dataset df, String cols, String indexPath, String commitTime, List validateCommits) { + Path savePath = new Path(indexPath, commitTime); + SparkSession spark = df.sparkSession(); + FileSystem fs = FSUtils.getFs(indexPath, spark.sparkContext().hadoopConfiguration()); + Dataset statisticsDF = ZCurveOptimizeHelper.getMinMaxValue(df, cols); + // try to find last validate index table from index path + try { + if (fs.exists(new Path(indexPath))) { + List allIndexTables = Arrays + .stream(fs.listStatus(new Path(indexPath))).filter(f -> f.isDirectory()).map(f -> f.getPath().getName()).collect(Collectors.toList()); + List candidateIndexTables = allIndexTables.stream().filter(f -> validateCommits.contains(f)).sorted().collect(Collectors.toList()); + List residualTables = allIndexTables.stream().filter(f -> !validateCommits.contains(f)).collect(Collectors.toList()); + Option latestIndexData = Option.empty(); + if (!candidateIndexTables.isEmpty()) { + latestIndexData = Option.of(spark.read().load(new Path(indexPath, candidateIndexTables.get(candidateIndexTables.size() - 1)).toString())); + // clean old index table, keep at most 1 index table. + candidateIndexTables.remove(candidateIndexTables.size() - 1); + candidateIndexTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + } + + // clean residualTables + // retried cluster operations at the same instant time is also considered, + // the residual files produced by retried are cleaned up before save statistics + // save statistics info to index table which named commitTime + residualTables.forEach(f -> { + try { + fs.delete(new Path(indexPath, f)); + } catch (IOException ie) { + throw new HoodieException(ie); + } + }); + + if (latestIndexData.isPresent() && latestIndexData.get().schema().equals(statisticsDF.schema())) { + // update the statistics info + String originalTable = "indexTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + String updateTable = "updateTable_" + java.util.UUID.randomUUID().toString().replace("-", ""); + latestIndexData.get().registerTempTable(originalTable); + statisticsDF.registerTempTable(updateTable); + // update table by full out join + List columns = Arrays.asList(statisticsDF.schema().fieldNames()); + spark.sql(HoodieSparkUtils$ + .MODULE$.createMergeSql(originalTable, updateTable, JavaConversions.asScalaBuffer(columns))).repartition(1).write().save(savePath.toString()); + } + } else { + statisticsDF.repartition(1).write().mode("overwrite").save(savePath.toString()); + } + } catch (IOException e) { + throw new HoodieException(e); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 172bbc4919592..ce39843275815 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -19,6 +19,7 @@ package org.apache.hudi import java.util.Properties + import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} @@ -35,6 +36,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, Literal} import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, SparkSession} @@ -283,4 +285,43 @@ object HoodieSparkUtils extends SparkAdapterSupport { s"${tableSchema.fieldNames.mkString(",")}") AttributeReference(columnName, field.get.dataType, field.get.nullable)() } + + /** + * Create merge sql to merge leftTable and right table. + * + * @param leftTable table name. + * @param rightTable table name. + * @param cols merged cols. + * @return merge sql. + */ + def createMergeSql(leftTable: String, rightTable: String, cols: Seq[String]): String = { + var selectsql = "" + for (i <- (0 to cols.size-1)) { + selectsql = selectsql + s" if (${leftTable}.${cols(0)} is null, ${rightTable}.${cols(i)}, ${leftTable}.${cols(i)}) as ${cols(i)} ," + } + "select " + selectsql.dropRight(1) + s" from ${leftTable} full join ${rightTable} on ${leftTable}.${cols(0)} = ${rightTable}.${cols(0)}" + } + + /** + * Collect min/max statistics for candidate cols. + * support all col types. + * + * @param df dataFrame holds read files. + * @param cols candidate cols to collect statistics. + * @return + */ + def getMinMaxValueSpark(df: DataFrame, cols: Seq[String]): DataFrame = { + val sqlContext = df.sparkSession.sqlContext + import sqlContext.implicits._ + + val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount"))) + val valueCounts = count("*").as("totalNum") + val projectValues = Seq(col("file")) ++ cols.flatMap(c => + Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls"))) + + val result = df.select(input_file_name() as "file", col("*")) + .groupBy($"file") + .agg(valueCounts, values: _*).select(projectValues:_*) + result + } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala new file mode 100644 index 0000000000000..da993b7545e53 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.execution + +import java.util + +import org.apache.hudi.config.HoodieClusteringConfig +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, BoundReference, SortOrder, UnsafeProjection, UnsafeRow} +import org.apache.hudi.optimize.ZOrderingUtil +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.types._ +import org.apache.spark.util.MutablePair +import org.apache.spark.util.random.SamplingUtils + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.{ClassTag, classTag} +import scala.util.hashing.byteswap32 + +class RangeSample[K: ClassTag, V]( + zEncodeNum: Int, + rdd: RDD[_ <: Product2[K, V]], + private var ascend: Boolean = true, + val samplePointsPerPartitionHint: Int = 20) extends Serializable { + + // We allow zEncodeNum = 0, which happens when sorting an empty RDD under the default settings. + require(zEncodeNum >= 0, s"Number of zEncodeNum cannot be negative but found $zEncodeNum.") + require(samplePointsPerPartitionHint > 0, + s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint") + + def getRangeBounds(): ArrayBuffer[(K, Float)] = { + if (zEncodeNum <= 1) { + ArrayBuffer.empty[(K, Float)] + } else { + // This is the sample size we need to have roughly balanced output partitions, capped at 1M. + // Cast to double to avoid overflowing ints or longs + val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * zEncodeNum, 1e6) + // Assume the input partitions are roughly balanced and over-sample a little bit. + val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt + val (numItems, sketched) = sketch(rdd.map(_._1), sampleSizePerPartition) + if (numItems == 0L) { + ArrayBuffer.empty[(K, Float)] + } else { + // If a partition contains much more than the average number of items, we re-sample from it + // to ensure that enough items are collected from that partition. + val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) + val candidates = ArrayBuffer.empty[(K, Float)] + val imbalancedPartitions = mutable.Set.empty[Int] + + sketched.foreach { case (idx, n, sample) => + if (fraction * n > sampleSizePerPartition) { + imbalancedPartitions += idx + } else { + // The weight is 1 over the sampling probability. + val weight = (n.toDouble / sample.length).toFloat + for (key <- sample) { + candidates += ((key, weight)) + } + } + } + + if (imbalancedPartitions.nonEmpty) { + // Re-sample imbalanced partitions with the desired sampling probability. + val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) + val seed = byteswap32(-rdd.id - 1) + val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() + val weight = (1.0 / fraction).toFloat + candidates ++= reSampled.map(x => (x, weight)) + } + candidates + } + } + } + + /** + * Determines the bounds for range partitioning from candidates with weights indicating how many + * items each represents. Usually this is 1 over the probability used to sample this candidate. + * + * @param candidates unordered candidates with weights + * @param partitions number of partitions + * @return selected bounds + */ + def determineBound[K : Ordering : ClassTag]( + candidates: ArrayBuffer[(K, Float)], + partitions: Int, ordering: Ordering[K]): Array[K] = { + val ordered = candidates.sortBy(_._1)(ordering) + val numCandidates = ordered.size + val sumWeights = ordered.map(_._2.toDouble).sum + val step = sumWeights / partitions + var cumWeight = 0.0 + var target = step + val bounds = ArrayBuffer.empty[K] + var i = 0 + var j = 0 + var previousBound = Option.empty[K] + while ((i < numCandidates) && (j < partitions - 1)) { + val (key, weight) = ordered(i) + cumWeight += weight + if (cumWeight >= target) { + // Skip duplicate values. + if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { + bounds += key + target += step + j += 1 + previousBound = Some(key) + } + } + i += 1 + } + bounds.toArray + } + + def determineRowBounds[K : Ordering : ClassTag]( + candidates: ArrayBuffer[(K, Float)], + partitions: Int, orderings: Seq[Ordering[K]], + attributes: Seq[Attribute]): Array[Array[UnsafeRow]] = { + + orderings.zipWithIndex.map { case (ordering, index) => + val ordered = candidates.sortBy(_._1)(ordering) + val numCandidates = ordered.size + val sumWeights = ordered.map(_._2.toDouble).sum + val step = sumWeights / partitions + var cumWeight = 0.0 + var target = step + val bounds = ArrayBuffer.empty[K] + var i = 0 + var j = 0 + var previousBound = Option.empty[K] + while ((i < numCandidates) && (j < partitions - 1)) { + val (key, weight) = ordered(i) + cumWeight += weight + if (cumWeight >= target) { + // Skip duplicate values. + if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) { + bounds += key + target += step + j += 1 + previousBound = Some(key) + } + } + i += 1 + } + // build project + val project = UnsafeProjection.create(Seq(attributes(index)), attributes) + bounds.map { bound => + val row = bound.asInstanceOf[UnsafeRow] + project(row).copy() + }.toArray + }.toArray + } + + /** + * Sketches the input RDD via reservoir sampling on each partition. + * + * @param rdd the input RDD to sketch + * @param sampleSizePerPartition max sample size per partition + * @return (total number of items, an array of (partitionId, number of items, sample)) + */ + def sketch[K: ClassTag]( + rdd: RDD[K], + sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = { + val shift = rdd.id + // val classTagK = classTag[K] // to avoid serializing the entire partitioner object + val sketched = rdd.mapPartitionsWithIndex { (idx, iter) => + val seed = byteswap32(idx ^ (shift << 16)) + val (sample, n) = SamplingUtils.reservoirSampleAndCount( + iter, sampleSizePerPartition, seed) + Iterator((idx, n, sample)) + }.collect() + val numItems = sketched.map(_._2).sum + (numItems, sketched) + } +} + +class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends Serializable { + + private var binarySearch: ((Array[K], K) => Int) = { + // For primitive keys, we can use the natural ordering. Otherwise, use the Ordering comparator. + classTag[K] match { + case ClassTag.Float => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float]) + case ClassTag.Double => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Double]], x.asInstanceOf[Double]) + case ClassTag.Byte => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Byte]], x.asInstanceOf[Byte]) + case ClassTag.Char => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Char]], x.asInstanceOf[Char]) + case ClassTag.Short => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Short]], x.asInstanceOf[Short]) + case ClassTag.Int => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Int]], x.asInstanceOf[Int]) + case ClassTag.Long => + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Long]], x.asInstanceOf[Long]) + case _ => + val comparator = ordering.asInstanceOf[java.util.Comparator[Any]] + (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[AnyRef]], x, comparator) + } + } + + def getBound(key: Any, candidateBounds: Array[K]): Int = { + val k = key.asInstanceOf[K] + var bound = 0 + if (candidateBounds.length <= 128) { + while(bound < candidateBounds.length && ordering.gt(k, candidateBounds(bound))) { + bound += 1 + } + } else { + bound = binarySearch(candidateBounds, k) + if (bound < 0 ) { + bound = -bound - 1 + } + if (bound > candidateBounds.length) { + bound = candidateBounds.length + } + } + bound + } +} + +case class ZorderingBinarySort(b: Array[Byte]) extends Ordered[ZorderingBinarySort] with Serializable { + override def compare(that: ZorderingBinarySort): Int = { + val len = this.b.length + ZOrderingUtil.compareTo(this.b, 0, len, that.b, 0, len) + } +} + +object RangeSampleSort { + + /** + * create z-order DataFrame by sample + * support all col types + */ + def sortDataFrameBySampleSupportAllTypes(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = { + val spark = df.sparkSession + val internalRdd = df.queryExecution.toRdd + val schema = df.schema + val outputAttributes = df.queryExecution.analyzed.output + val sortingExpressions = outputAttributes.filter(p => zCols.contains(p.name)) + if (sortingExpressions.length == 0 || sortingExpressions.length != zCols.size) { + df + } else { + val zOrderBounds = df.sparkSession.sessionState.conf.getConfString( + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key, + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt + + val sampleRdd = internalRdd.mapPartitionsInternal { iter => + val projection = UnsafeProjection.create(sortingExpressions, outputAttributes) + val mutablePair = new MutablePair[InternalRow, Null]() + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) + } + + val orderings = sortingExpressions.map(SortOrder(_, Ascending)).zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) + } + + val lazyGeneratedOrderings = orderings.map(ord => new LazilyGeneratedOrdering(Seq(ord))) + + val sample = new RangeSample(zOrderBounds, sampleRdd) + + val rangeBounds = sample.getRangeBounds() + + implicit val ordering1 = lazyGeneratedOrderings(0) + + val sampleBounds = sample.determineRowBounds(rangeBounds, math.min(zOrderBounds, rangeBounds.length), lazyGeneratedOrderings, sortingExpressions) + + val origin_orderings = sortingExpressions.map(SortOrder(_, Ascending)).map { ord => + ord.copy(child = BoundReference(0, ord.dataType, ord.nullable)) + } + + val origin_lazyGeneratedOrderings = origin_orderings.map(ord => new LazilyGeneratedOrdering(Seq(ord))) + + // expand bounds. + // maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength, + // however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number" + val maxLength = sampleBounds.map(_.length).max + val expandSampleBoundsWithFactor = sampleBounds.map { bound => + val fillFactor = maxLength / bound.size.toDouble + (bound, fillFactor) + } + + val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) + + val indexRdd = internalRdd.mapPartitionsInternal { iter => + val boundsWithFactor = boundBroadCast.value + import java.util.concurrent.ThreadLocalRandom + val threadLocalRandom = ThreadLocalRandom.current + val maxBoundNum = boundsWithFactor.map(_._1.length).max + val origin_Projections = sortingExpressions.map { se => + UnsafeProjection.create(Seq(se), outputAttributes) + } + + iter.map { unsafeRow => + val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) => + val row = rowProject(unsafeRow) + val decisionBound = new RawDecisionBound(lazyOrdering) + if (row.isNullAt(0)) { + maxBoundNum + 1 + } else { + val (bound, factor) = boundsWithFactor(index) + if (factor > 1) { + val currentRank = decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) + currentRank*factor.toInt + threadLocalRandom.nextInt(factor.toInt) + } else { + decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) + } + } + }.toArray.map(ZOrderingUtil.intTo8Byte(_)) + val zValues = ZOrderingUtil.interleaving(interleaveValues, 8) + val mutablePair = new MutablePair[InternalRow, Array[Byte]]() + + mutablePair.update(unsafeRow, zValues) + } + }.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1) + spark.internalCreateDataFrame(indexRdd, schema) + } + } + + /** + * create z-order DataFrame by sample + * first, sample origin data to get z-cols bounds, then create z-order DataFrame + * support all type data. + * this method need more resource and cost more time than createZIndexedDataFrameByMapValue + */ + def sortDataFrameBySample(df: DataFrame, zCols: Seq[String], fileNum: Int): DataFrame = { + val spark = df.sparkSession + val columnsMap = df.schema.fields.map(item => (item.name, item)).toMap + val fieldNum = df.schema.fields.length + val checkCols = zCols.filter(col => columnsMap(col) != null) + + if (zCols.isEmpty || checkCols.isEmpty) { + df + } else { + val zFields = zCols.map { col => + val newCol = columnsMap(col) + if (newCol == null) { + (-1, null) + } else { + newCol.dataType match { + case LongType | DoubleType | FloatType | StringType | IntegerType | DateType | TimestampType | ShortType | ByteType => + (df.schema.fields.indexOf(newCol), newCol) + case d: DecimalType => + (df.schema.fields.indexOf(newCol), newCol) + case _ => + (-1, null) + } + } + }.filter(_._1 != -1) + // Complex type found, use createZIndexedDataFrameByRange + if (zFields.length != zCols.length) { + return sortDataFrameBySampleSupportAllTypes(df, zCols, fieldNum) + } + + val rawRdd = df.rdd + val sampleRdd = rawRdd.map { row => + val values = zFields.map { case (index, field) => + field.dataType match { + case LongType => + if (row.isNullAt(index)) Long.MaxValue else row.getLong(index) + case DoubleType => + if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getDouble(index)) + case IntegerType => + if (row.isNullAt(index)) Long.MaxValue else row.getInt(index).toLong + case FloatType => + if (row.isNullAt(index)) Long.MaxValue else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble) + case StringType => + if (row.isNullAt(index)) "" else row.getString(index) + case DateType => + if (row.isNullAt(index)) Long.MaxValue else row.getDate(index).getTime + case TimestampType => + if (row.isNullAt(index)) Long.MaxValue else row.getTimestamp(index).getTime + case ByteType => + if (row.isNullAt(index)) Long.MaxValue else row.getByte(index).toLong + case ShortType => + if (row.isNullAt(index)) Long.MaxValue else row.getShort(index).toLong + case d: DecimalType => + if (row.isNullAt(index)) Long.MaxValue else row.getDecimal(index).longValue() + case _ => + null + } + }.filter(v => v != null).toArray + (values, null) + } + val zOrderBounds = df.sparkSession.sessionState.conf.getConfString( + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.key, + HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE.defaultValue.toString).toInt + val sample = new RangeSample(zOrderBounds, sampleRdd) + val rangeBounds = sample.getRangeBounds() + val sampleBounds = { + val candidateColNumber = rangeBounds.head._1.length + (0 to candidateColNumber - 1).map { i => + val colRangeBound = rangeBounds.map(x => (x._1(i), x._2)) + + if (colRangeBound.head._1.isInstanceOf[String]) { + sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(String, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[String]) + } else { + sample.determineBound(colRangeBound.asInstanceOf[ArrayBuffer[(Long, Float)]], math.min(zOrderBounds, rangeBounds.length), Ordering[Long]) + } + } + } + + // expand bounds. + // maybe it's better to use the value of "spark.zorder.bounds.number" as maxLength, + // however this will lead to extra time costs when all zorder cols distinct count values are less then "spark.zorder.bounds.number" + val maxLength = sampleBounds.map(_.length).max + val expandSampleBoundsWithFactor = sampleBounds.map { bound => + val fillFactor = maxLength / bound.size + val newBound = new Array[Double](bound.length * fillFactor) + if (bound.isInstanceOf[Array[Long]] && fillFactor > 1) { + val longBound = bound.asInstanceOf[Array[Long]] + for (i <- 0 to bound.length - 1) { + for (j <- 0 to fillFactor - 1) { + // sample factor shoud not be too large, so it's ok to use 1 / fillfactor as slice + newBound(j + i*(fillFactor)) = longBound(i) + (j + 1) * (1 / fillFactor.toDouble) + } + } + (newBound, fillFactor) + } else { + (bound, 0) + } + } + + val boundBroadCast = spark.sparkContext.broadcast(expandSampleBoundsWithFactor) + + val indexRdd = rawRdd.mapPartitions { iter => + val expandBoundsWithFactor = boundBroadCast.value + val maxBoundNum = expandBoundsWithFactor.map(_._1.length).max + val longDecisionBound = new RawDecisionBound(Ordering[Long]) + val doubleDecisionBound = new RawDecisionBound(Ordering[Double]) + val stringDecisionBound = new RawDecisionBound(Ordering[String]) + import java.util.concurrent.ThreadLocalRandom + val threadLocalRandom = ThreadLocalRandom.current + + def getRank(rawIndex: Int, value: Long, isNull: Boolean): Int = { + val (expandBound, factor) = expandBoundsWithFactor(rawIndex) + if (isNull) { + expandBound.length + 1 + } else { + if (factor > 1) { + doubleDecisionBound.getBound(value + (threadLocalRandom.nextInt(factor) + 1)*(1 / factor.toDouble), expandBound.asInstanceOf[Array[Double]]) + } else { + longDecisionBound.getBound(value, expandBound.asInstanceOf[Array[Long]]) + } + } + } + + iter.map { row => + val values = zFields.zipWithIndex.map { case ((index, field), rawIndex) => + field.dataType match { + case LongType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getLong(index), isNull) + case DoubleType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getDouble(index)), isNull) + case IntegerType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getInt(index).toLong, isNull) + case FloatType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else java.lang.Double.doubleToLongBits(row.getFloat(index).toDouble), isNull) + case StringType => + val factor = maxBoundNum.toDouble / expandBoundsWithFactor(rawIndex)._1.length + if (row.isNullAt(index)) { + maxBoundNum + 1 + } else { + val currentRank = stringDecisionBound.getBound(row.getString(index), expandBoundsWithFactor(rawIndex)._1.asInstanceOf[Array[String]]) + if (factor > 1) { + (currentRank*factor).toInt + threadLocalRandom.nextInt(factor.toInt) + } else { + currentRank + } + } + case DateType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getDate(index).getTime, isNull) + case TimestampType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getTimestamp(index).getTime, isNull) + case ByteType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getByte(index).toLong, isNull) + case ShortType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getShort(index).toLong, isNull) + case d: DecimalType => + val isNull = row.isNullAt(index) + getRank(rawIndex, if (isNull) 0 else row.getDecimal(index).longValue(), isNull) + case _ => + -1 + } + }.filter(v => v != -1).map(ZOrderingUtil.intTo8Byte(_)).toArray + val zValues = ZOrderingUtil.interleaving(values, 8) + Row.fromSeq(row.toSeq ++ Seq(zValues)) + } + }.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)), numPartitions = fileNum) + val newDF = df.sparkSession.createDataFrame(indexRdd, StructType( + df.schema.fields ++ Seq( + StructField(s"zindex", + BinaryType, false)) + )) + newDF.drop("zindex") + } + } +} + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java new file mode 100644 index 0000000000000..ca977ae53b5f9 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.parquet.schema.PrimitiveStringifier; + +import java.util.Objects; + +/** + * Hoodie Range metadata. + */ +public class HoodieColumnRangeMetadata { + private final String filePath; + private final String columnName; + private final T minValue; + private final T maxValue; + private final long numNulls; + private final PrimitiveStringifier stringifier; + + public HoodieColumnRangeMetadata(final String filePath, final String columnName, final T minValue, final T maxValue, final long numNulls, final PrimitiveStringifier stringifier) { + this.filePath = filePath; + this.columnName = columnName; + this.minValue = minValue; + this.maxValue = maxValue; + this.numNulls = numNulls; + this.stringifier = stringifier; + } + + public String getFilePath() { + return this.filePath; + } + + public String getColumnName() { + return this.columnName; + } + + public T getMinValue() { + return this.minValue; + } + + public T getMaxValue() { + return this.maxValue; + } + + public PrimitiveStringifier getStringifier() { + return stringifier; + } + + public long getNumNulls() { + return numNulls; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final HoodieColumnRangeMetadata that = (HoodieColumnRangeMetadata) o; + return Objects.equals(getFilePath(), that.getFilePath()) + && Objects.equals(getColumnName(), that.getColumnName()) + && Objects.equals(getMinValue(), that.getMinValue()) + && Objects.equals(getMaxValue(), that.getMaxValue()) + && Objects.equals(getNumNulls(), that.getNumNulls()); + } + + @Override + public int hashCode() { + return Objects.hash(getColumnName(), getMinValue(), getMaxValue(), getNumNulls()); + } + + @Override + public String toString() { + return "HoodieColumnRangeMetadata{" + + "filePath ='" + filePath + '\'' + + "columnName='" + columnName + '\'' + + ", minValue=" + minValue + + ", maxValue=" + maxValue + + ", numNulls=" + numNulls + '}'; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 2b94d7ff072a5..340a99ec2e208 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -79,6 +79,7 @@ public class HoodieTableMetaClient implements Serializable { public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".aux"; public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + Path.SEPARATOR + ".bootstrap"; public static final String HEARTBEAT_FOLDER_NAME = METAFOLDER_NAME + Path.SEPARATOR + ".heartbeat"; + public static final String ZINDEX_NAME = ".zindex"; public static final String BOOTSTRAP_INDEX_BY_PARTITION_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR + ".partitions"; public static final String BOOTSTRAP_INDEX_BY_FILE_ID_FOLDER_PATH = BOOTSTRAP_INDEX_ROOT_FOLDER_PATH + Path.SEPARATOR @@ -176,6 +177,13 @@ public String getMetaPath() { return metaPath; } + /** + * @return z-index path + */ + public String getZindexPath() { + return new Path(metaPath, ZINDEX_NAME).toString(); + } + /** * @return Temp Folder path */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index ebe361025991c..c142e8a9608be 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.exception.HoodieIOException; @@ -41,12 +42,14 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** * Utility functions involving with parquet. @@ -277,4 +280,59 @@ public Boolean apply(String recordKey) { return candidateKeys.contains(recordKey); } } + + /** + * Parse min/max statistics stored in parquet footers for all columns. + */ + public Collection> readRangeFromParquetMetadata(Configuration conf, Path parquetFilePath, List cols) { + ParquetMetadata metadata = readMetadata(conf, parquetFilePath); + // collect stats from all parquet blocks + Map>> columnToStatsListMap = metadata.getBlocks().stream().flatMap(blockMetaData -> { + return blockMetaData.getColumns().stream().filter(f -> cols.contains(f.getPath().toDotString())).map(columnChunkMetaData -> + new HoodieColumnRangeMetadata<>(parquetFilePath.getName(), columnChunkMetaData.getPath().toDotString(), + columnChunkMetaData.getStatistics().genericGetMin(), + columnChunkMetaData.getStatistics().genericGetMax(), + columnChunkMetaData.getStatistics().getNumNulls(), + columnChunkMetaData.getPrimitiveType().stringifier())); + }).collect(Collectors.groupingBy(e -> e.getColumnName())); + + // we only intend to keep file level statistics. + return new ArrayList<>(columnToStatsListMap.values().stream() + .map(blocks -> getColumnRangeInFile(blocks)) + .collect(Collectors.toList())); + } + + private HoodieColumnRangeMetadata getColumnRangeInFile(final List> blockRanges) { + if (blockRanges.size() == 1) { + // only one block in parquet file. we can just return that range. + return blockRanges.get(0); + } else { + // there are multiple blocks. Compute min(block_mins) and max(block_maxs) + return blockRanges.stream().reduce((b1, b2) -> combineRanges(b1, b2)).get(); + } + } + + private HoodieColumnRangeMetadata combineRanges(HoodieColumnRangeMetadata range1, + HoodieColumnRangeMetadata range2) { + final Comparable minValue; + final Comparable maxValue; + if (range1.getMinValue() != null && range2.getMinValue() != null) { + minValue = range1.getMinValue().compareTo(range2.getMinValue()) < 0 ? range1.getMinValue() : range2.getMinValue(); + } else if (range1.getMinValue() == null) { + minValue = range2.getMinValue(); + } else { + minValue = range1.getMinValue(); + } + + if (range1.getMaxValue() != null && range2.getMaxValue() != null) { + maxValue = range1.getMaxValue().compareTo(range2.getMaxValue()) < 0 ? range2.getMaxValue() : range1.getMaxValue(); + } else if (range1.getMaxValue() == null) { + maxValue = range2.getMaxValue(); + } else { + maxValue = range1.getMaxValue(); + } + + return new HoodieColumnRangeMetadata<>(range1.getFilePath(), + range1.getColumnName(), minValue, maxValue, range1.getNumNulls() + range2.getNumNulls(), range1.getStringifier()); + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index dc8e7ed464ac6..94bcc0d0de85e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -110,6 +110,12 @@ object DataSourceReadOptions { .withDocumentation("The query instant for time travel. Without specified this option," + " we query the latest snapshot.") + val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty + .key("hoodie.enable.data.skipping") + .defaultValue(true) + .sinceVersion("0.10.0") + .withDocumentation("enable data skipping to boost query after doing z-order optimize for current table") + /** @deprecated Use {@link QUERY_TYPE} and its methods instead */ @Deprecated val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key() diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index af0c2cc11b026..882636c4697be 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -28,19 +28,20 @@ import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTab import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} -import org.apache.spark.sql.hudi.HoodieSqlUtils +import org.apache.spark.sql.hudi.{DataSkippingUtils, HoodieSqlUtils} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String - import java.util.Properties + import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.collection.mutable /** @@ -84,6 +85,12 @@ case class HoodieFileIndex( private val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) .map(HoodieSqlUtils.formatQueryInstant) + /** + * Get all completeCommits. + */ + lazy val completedCommits = metaClient.getCommitsTimeline + .filterCompletedInstants().getInstants.iterator().toList.map(_.getTimestamp) + /** * Get the schema of the table. */ @@ -147,6 +154,48 @@ case class HoodieFileIndex( override def rootPaths: Seq[Path] = queryPath :: Nil + def enableDataSkipping(): Boolean = { + options.getOrElse(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), + spark.sessionState.conf.getConfString(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "false")).toBoolean + } + + private def filterFilesByDataSkippingIndex(dataFilters: Seq[Expression]): Set[String] = { + var allFiles: Set[String] = Set.empty + var candidateFiles: Set[String] = Set.empty + val indexPath = metaClient.getZindexPath + val fs = metaClient.getFs + if (fs.exists(new Path(indexPath)) && dataFilters.nonEmpty) { + // try to load latest index table from index path + val candidateIndexTables = fs.listStatus(new Path(indexPath)).filter(_.isDirectory) + .map(_.getPath.getName).filter(f => completedCommits.contains(f)).sortBy(x => x) + if (candidateIndexTables.nonEmpty) { + val dataFrameOpt = try { + Some(spark.read.load(new Path(indexPath, candidateIndexTables.last).toString)) + } catch { + case _: Throwable => + logError("missing index skip data-skipping") + None + } + + if (dataFrameOpt.isDefined) { + val indexSchema = dataFrameOpt.get.schema + val indexFiles = DataSkippingUtils.getIndexFiles(spark.sparkContext.hadoopConfiguration, new Path(indexPath, candidateIndexTables.last).toString) + val indexFilter = dataFilters.map(DataSkippingUtils.createZindexFilter(_, indexSchema)).reduce(And) + logInfo(s"index filter condition: $indexFilter") + dataFrameOpt.get.persist() + if (indexFiles.size <= 4) { + allFiles = DataSkippingUtils.readParquetFile(spark, indexFiles) + } else { + allFiles = dataFrameOpt.get.select("file").collect().map(_.getString(0)).toSet + } + candidateFiles = dataFrameOpt.get.filter(new Column(indexFilter)).select("file").collect().map(_.getString(0)).toSet + dataFrameOpt.get.unpersist() + } + } + } + allFiles -- candidateFiles + } + /** * Invoked by Spark to fetch list of latest base files per partition. * @@ -156,12 +205,29 @@ case class HoodieFileIndex( */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + // try to load filterFiles from index + val filterFiles: Set[String] = if (enableDataSkipping()) { + filterFilesByDataSkippingIndex(dataFilters) + } else { + Set.empty + } if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. - Seq(PartitionDirectory(InternalRow.empty, allFiles)) + val candidateFiles = if (!filterFiles.isEmpty) { + allFiles.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName)) + } else { + allFiles + } + logInfo(s"Total files : ${allFiles.size}," + + s" candidate files after data skipping: ${candidateFiles.size} " + + s" skipping percent ${if (allFiles.length != 0) (allFiles.size - candidateFiles.size) / allFiles.size.toDouble else 0}") + Seq(PartitionDirectory(InternalRow.empty, candidateFiles)) } else { // Prune the partition path by the partition filters val prunedPartitions = prunePartition(cachedAllInputFileSlices.keys.toSeq, partitionFilters) - prunedPartitions.map { partition => + var totalFileSize = 0 + var candidateFileSize = 0 + + val result = prunedPartitions.map { partition => val baseFileStatuses = cachedAllInputFileSlices(partition).map(fileSlice => { if (fileSlice.getBaseFile.isPresent) { fileSlice.getBaseFile.get().getFileStatus @@ -169,9 +235,19 @@ case class HoodieFileIndex( null } }).filterNot(_ == null) - - PartitionDirectory(partition.values, baseFileStatuses) + val candidateFiles = if (!filterFiles.isEmpty) { + baseFileStatuses.filterNot(fileStatus => filterFiles.contains(fileStatus.getPath.getName)) + } else { + baseFileStatuses + } + totalFileSize += baseFileStatuses.size + candidateFileSize += candidateFiles.size + PartitionDirectory(partition.values, candidateFiles) } + logInfo(s"Total files: ${totalFileSize}," + + s" Candidate files after data skipping : ${candidateFileSize} " + + s"skipping percent ${if (allFiles.length != 0) (totalFileSize - candidateFileSize) / totalFileSize.toDouble else 0}") + result } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala new file mode 100644 index 0000000000000..45a7aec142d5a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, AttributeReference, EqualNullSafe, EqualTo, Expression, ExtractValue, GetStructField, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, StartsWith} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.JavaConverters._ + +object DataSkippingUtils { + + /** + * create z_index filter and push those filters to index table to filter all candidate scan files. + * @param condition origin filter from query. + * @param indexSchema schema from index table. + * @return filters for index table. + */ + def createZindexFilter(condition: Expression, indexSchema: StructType): Expression = { + def buildExpressionInternal(colName: Seq[String], statisticValue: String): Expression = { + val appendColName = UnresolvedAttribute(colName).name + statisticValue + col(appendColName).expr + } + + def reWriteCondition(colName: Seq[String], conditionExpress: Expression): Expression = { + val appendColName = UnresolvedAttribute(colName).name + "_minValue" + if (indexSchema.exists(p => p.name == appendColName)) { + conditionExpress + } else { + Literal.TrueLiteral + } + } + + val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_minValue") + val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, "_maxValue") + val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, "_num_nulls") + + condition match { + // query filter "colA = b" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table + case EqualTo(attribute: AttributeReference, value: Literal) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))) + // query filter "b = colA" convert it to "colA_minValue <= b and colA_maxValue >= b" for index table + case EqualTo(value: Literal, attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))) + // query filter "colA = null" convert it to "colA_num_nulls = null" for index table + case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) => + val colName = getTargetColNameParts(equalNullSafe.left) + reWriteCondition(colName, EqualTo(num_nulls(colName), equalNullSafe.right)) + // query filter "colA < b" convert it to "colA_minValue < b" for index table + case LessThan(attribute: AttributeReference, value: Literal) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName,LessThan(minValue(colName), value)) + // query filter "b < colA" convert it to "colA_maxValue > b" for index table + case LessThan(value: Literal, attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, GreaterThan(maxValue(colName), value)) + // query filter "colA > b" convert it to "colA_maxValue > b" for index table + case GreaterThan(attribute: AttributeReference, value: Literal) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, GreaterThan(maxValue(colName), value)) + // query filter "b > colA" convert it to "colA_minValue < b" for index table + case GreaterThan(value: Literal, attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, LessThan(minValue(colName), value)) + // query filter "colA <= b" convert it to "colA_minValue <= b" for index table + case LessThanOrEqual(attribute: AttributeReference, value: Literal) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, LessThanOrEqual(minValue(colName), value)) + // query filter "b <= colA" convert it to "colA_maxValue >= b" for index table + case LessThanOrEqual(value: Literal, attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value)) + // query filter "colA >= b" convert it to "colA_maxValue >= b" for index table + case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) => + val colName = getTargetColNameParts(attribute) + GreaterThanOrEqual(maxValue(colName), right) + // query filter "b >= colA" convert it to "colA_minValue <= b" for index table + case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, LessThanOrEqual(minValue(colName), value)) + // query filter "colA is null" convert it to "colA_num_nulls > 0" for index table + case IsNull(attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0))) + // query filter "colA is not null" convert it to "colA_num_nulls = 0" for index table + case IsNotNull(attribute: AttributeReference) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0))) + // query filter "colA in (a,b)" convert it to " (colA_minValue <= a and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for index table + case In(attribute: AttributeReference, list: Seq[Literal]) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, list.map { lit => + And(LessThanOrEqual(minValue(colName), lit), GreaterThanOrEqual(maxValue(colName), lit)) + }.reduce(Or)) + // query filter "colA like xxx" convert it to " (colA_minValue <= xxx and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with xxx) " for index table + case StartsWith(attribute, v @ Literal(_: UTF8String, _)) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), v), GreaterThanOrEqual(maxValue(colName), v)) , + Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), v)))) + // query filter "colA not in (a, b)" convert it to " (not( colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and colA_maxValue = b)) " for index table + case Not(In(attribute: AttributeReference, list: Seq[Literal])) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, list.map { lit => + Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), lit))) + }.reduce(And)) + // query filter "colA != b" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table + case Not(EqualTo(attribute: AttributeReference, value: Literal)) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value)))) + // query filter "b != colA" convert it to "not ( colA_minValue = b and colA_maxValue = b )" for index table + case Not(EqualTo(value: Literal, attribute: AttributeReference)) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, Not(And(EqualTo(minValue(colName), value), EqualTo(maxValue(colName), value)))) + // query filter "colA not like xxxx" convert it to "not ( colA_minValue startWith xxx and colA_maxValue startWith xxx)" for index table + case Not(StartsWith(attribute, value @ Literal(_: UTF8String, _))) => + val colName = getTargetColNameParts(attribute) + reWriteCondition(colName, Not(And(StartsWith(minValue(colName), value), StartsWith(maxValue(colName), value)))) + case or: Or => + val resLeft = createZindexFilter(or.left, indexSchema) + val resRight = createZindexFilter(or.right, indexSchema) + Or(resLeft, resRight) + + case and: And => + val resLeft = createZindexFilter(and.left, indexSchema) + val resRight = createZindexFilter(and.right, indexSchema) + And(resLeft, resRight) + + case expr: Expression => + Literal.TrueLiteral + } + } + + /** + * Extracts name from a resolved expression referring to a nested or non-nested column. + */ + def getTargetColNameParts(resolvedTargetCol: Expression): Seq[String] = { + resolvedTargetCol match { + case attr: Attribute => Seq(attr.name) + + case Alias(c, _) => getTargetColNameParts(c) + + case GetStructField(c, _, Some(name)) => getTargetColNameParts(c) :+ name + + case ex: ExtractValue => + throw new AnalysisException(s"convert reference to name failed, Updating nested fields is only supported for StructType: ${ex}.") + + case other => + throw new AnalysisException(s"convert reference to name failed, Found unsupported expression ${other}") + } + } + + def getIndexFiles(conf: Configuration, indexPath: String): Seq[FileStatus] = { + val basePath = new Path(indexPath) + basePath.getFileSystem(conf) + .listStatus(basePath).filterNot(f => f.getPath.getName.endsWith(".parquet")) + } + + /** + * read parquet files concurrently by local. + * this method is mush faster than spark + */ + def readParquetFile(spark: SparkSession, indexFiles: Seq[FileStatus], filters: Seq[Filter] = Nil, schemaOpts: Option[StructType] = None): Set[String] = { + val hadoopConf = spark.sparkContext.hadoopConfiguration + val partitionedFiles = indexFiles.map(f => PartitionedFile(InternalRow.empty, f.getPath.toString, 0, f.getLen)) + + val requiredSchema = new StructType().add("file", StringType, true) + val schema = schemaOpts.getOrElse(requiredSchema) + val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(spark + , schema , StructType(Nil), requiredSchema, filters, Map.empty, hadoopConf) + val results = new Array[Iterator[String]](partitionedFiles.size) + partitionedFiles.zipWithIndex.par.foreach { case (pf, index) => + val fileIterator = parquetReader(pf).asInstanceOf[Iterator[Any]] + val rows = fileIterator.flatMap(_ match { + case r: InternalRow => Seq(r) + case b: ColumnarBatch => b.rowIterator().asScala + }).map(r => r.getString(0)) + results(index) = rows + } + results.flatMap(f => f).toSet + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala new file mode 100644 index 0000000000000..06ac600b0346e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestOptimizeTable.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import java.sql.{Date, Timestamp} + +import org.apache.hadoop.fs.Path +import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.ZCurveOptimizeHelper +import org.apache.spark.sql._ +import org.apache.spark.sql.types._ +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import scala.collection.JavaConversions._ +import scala.util.Random + +class TestOptimizeTable extends HoodieClientTestBase { + var spark: SparkSession = null + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @ParameterizedTest + @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) + def testOptimizewithClustering(tableType: String): Unit = { + // Bulk Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("001", 1000)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) + // option for clustering + .option("hoodie.parquet.small.file.limit", "0") + .option("hoodie.clustering.inline", "true") + .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824") + .option("hoodie.clustering.plan.strategy.small.file.limit", "629145600") + .option("hoodie.clustering.plan.strategy.max.bytes.per.group", Long.MaxValue.toString) + .option("hoodie.clustering.plan.strategy.target.file.max.bytes", String.valueOf(64 *1024 * 1024L)) + .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true") + .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "begin_lat, begin_lon") + .mode(SaveMode.Overwrite) + .save(basePath) + + assertEquals(1000, spark.read.format("hudi").load(basePath).count()) + assertEquals(1000, + spark.read.option(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key(), "true").format("hudi").load(basePath).count()) + } + + @Test + def testCollectMinMaxStatistics(): Unit = { + val testPath = new Path(System.getProperty("java.io.tmpdir"), "minMax") + val statisticPath = new Path(System.getProperty("java.io.tmpdir"), "stat") + val fs = testPath.getFileSystem(spark.sparkContext.hadoopConfiguration) + try { + val complexDataFrame = createComplexDataFrame(spark) + complexDataFrame.repartition(3).write.mode("overwrite").save(testPath.toString) + val df = spark.read.load(testPath.toString) + // do not support TimeStampType, so if we collect statistics for c4, should throw exception + val colDf = ZCurveOptimizeHelper.getMinMaxValue(df, "c1,c2,c3,c5,c6,c7,c8") + colDf.cache() + assertEquals(colDf.count(), 3) + assertEquals(colDf.take(1)(0).length, 22) + colDf.unpersist() + // try to save statistics + ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "2", Seq("0", "1")) + // save again + ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "3", Seq("0", "1", "2")) + // test old index table clean + ZCurveOptimizeHelper.saveStatisticsInfo(df, "c1,c2,c3,c5,c6,c7,c8", statisticPath.toString, "4", Seq("0", "1", "3")) + assertEquals(!fs.exists(new Path(statisticPath, "2")), true) + assertEquals(fs.exists(new Path(statisticPath, "3")), true) + } finally { + if (fs.exists(testPath)) fs.delete(testPath) + if (fs.exists(statisticPath)) fs.delete(statisticPath) + } + } + + def createComplexDataFrame(spark: SparkSession): DataFrame = { + val schema = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DecimalType(9,3)) + .add("c4", TimestampType) + .add("c5", ShortType) + .add("c6", DateType) + .add("c7", BinaryType) + .add("c8", ByteType) + + val rdd = spark.sparkContext.parallelize(0 to 1000, 1).map { item => + val c1 = Integer.valueOf(item) + val c2 = s" ${item}sdc" + val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}") + val c4 = new Timestamp(System.currentTimeMillis()) + val c5 = java.lang.Short.valueOf(s"${(item + 16) /10}") + val c6 = Date.valueOf(s"${2020}-${item % 11 + 1}-${item % 28 + 1}") + val c7 = Array(item).map(_.toByte) + val c8 = java.lang.Byte.valueOf("9") + + RowFactory.create(c1, c2, c3, c4, c5, c6, c7, c8) + } + spark.createDataFrame(rdd, schema) + } +}