Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.exception.HoodieException;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;

/**
Expand All @@ -40,6 +42,9 @@ public class HoodieClusteringConfig extends HoodieConfig {
// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";

// Any Space-filling curves optimize(z-order/hilbert) params can be saved with this prefix
public static final String LAYOUT_OPTIMIZE_PARAM_PREFIX = "hoodie.layout.optimize.";

public static final ConfigProperty<String> DAYBASED_LOOKBACK_PARTITIONS = ConfigProperty
.key(CLUSTERING_STRATEGY_PARAM_PREFIX + "daybased.lookback.partitions")
.defaultValue("2")
Expand Down Expand Up @@ -137,6 +142,55 @@ public class HoodieClusteringConfig extends HoodieConfig {
.sinceVersion("0.9.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

public static final ConfigProperty LAYOUT_OPTIMIZE_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "enable")
.defaultValue(false)
.sinceVersion("0.10.0")
.withDocumentation("Enable use z-ordering/space-filling curves to optimize the layout of table to boost query performance. "
+ "This parameter takes precedence over clustering strategy set using " + EXECUTION_STRATEGY_CLASS_NAME.key());

public static final ConfigProperty LAYOUT_OPTIMIZE_STRATEGY = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "strategy")
.defaultValue("z-order")
.sinceVersion("0.10.0")
.withDocumentation("Type of layout optimization to be applied, current only supports `z-order` and `hilbert` curves.");

/**
* There exists two method to build z-curve.
* one is directly mapping sort cols to z-value to build z-curve;
* we can find this method in Amazon DynamoDB https://aws.amazon.com/cn/blogs/database/tag/z-order/
* the other one is Boundary-based Interleaved Index method which we proposed. simply call it sample method.
* Refer to rfc-28 for specific algorithm flow.
* Boundary-based Interleaved Index method has better generalization, but the build speed is slower than direct method.
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "curve.build.method")
.defaultValue("direct")
.sinceVersion("0.10.0")
.withDocumentation("Controls how data is sampled to build the space filling curves. two methods: `direct`,`sample`."
+ "The direct method is faster than the sampling, however sample method would produce a better data layout.");
/**
* Doing sample for table data is the first step in Boundary-based Interleaved Index method.
* larger sample number means better optimize result, but more memory consumption
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "build.curve.sample.size")
.defaultValue("200000")
.sinceVersion("0.10.0")
.withDocumentation("when setting" + LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD.key() + " to `sample`, the amount of sampling to be done."
+ "Large sample size leads to better results, at the expense of more memory usage.");

/**
* The best way to use Z-order/Space-filling curves is to cooperate with Data-Skipping
* with data-skipping query engine can greatly reduce the number of table files to be read.
* otherwise query engine can only do row-group skipping for files (parquet/orc)
*/
public static final ConfigProperty LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE = ConfigProperty
.key(LAYOUT_OPTIMIZE_PARAM_PREFIX + "data.skipping.enable")
.defaultValue(true)
.sinceVersion("0.10.0")
.withDocumentation("Enable data skipping by collecting statistics once layout optimization is complete.");

/**
* @deprecated Use {@link #PLAN_STRATEGY_CLASS_NAME} and its methods instead
*/
Expand Down Expand Up @@ -350,9 +404,58 @@ public Builder withPreserveHoodieCommitMetadata(Boolean preserveHoodieCommitMeta
return this;
}

public Builder withSpaceFillingCurveDataOptimizeEnable(Boolean enable) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_ENABLE, String.valueOf(enable));
return this;
}

public Builder withDataOptimizeStrategy(String strategy) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_STRATEGY, strategy);
return this;
}

public Builder withDataOptimizeBuildCurveStrategy(String method) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD, method);
return this;
}

public Builder withDataOptimizeBuildCurveSampleNumber(int sampleNumber) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE, String.valueOf(sampleNumber));
return this;
}

public Builder withDataOptimizeDataSkippingEnable(boolean dataSkipping) {
clusteringConfig.setValue(LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE, String.valueOf(dataSkipping));
return this;
}

public HoodieClusteringConfig build() {
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
return clusteringConfig;
}
}

/**
* strategy types for build z-ordering/space-filling curves.
*/
public enum BuildCurveStrategyType {
DIRECT("direct"),
SAMPLE("sample");
private final String value;

BuildCurveStrategyType(String value) {
this.value = value;
}

public static BuildCurveStrategyType fromValue(String value) {
switch (value.toLowerCase(Locale.ROOT)) {
case "direct":
return DIRECT;
case "sample":
return SAMPLE;
default:
throw new HoodieException("Invalid value of Type.");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,30 @@ public String getClusteringSortColumns() {
return getString(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS);
}

/**
* Data layout optimize properties.
*/
public boolean isLayoutOptimizationEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE);
}

public String getLayoutOptimizationStrategy() {
return getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY);
}

public HoodieClusteringConfig.BuildCurveStrategyType getLayoutOptimizationCurveBuildMethod() {
return HoodieClusteringConfig.BuildCurveStrategyType.fromValue(
getString(HoodieClusteringConfig.LAYOUT_OPTIMIZE_CURVE_BUILD_METHOD));
}

public int getLayoutOptimizationSampleSize() {
return getInt(HoodieClusteringConfig.LAYOUT_OPTIMIZE_BUILD_CURVE_SAMPLE_SIZE);
}

public boolean isDataSkippingEnabled() {
return getBoolean(HoodieClusteringConfig.LAYOUT_OPTIMIZE_DATA_SKIPPING_ENABLE);
}

/**
* index properties.
*/
Expand Down Expand Up @@ -1776,6 +1800,7 @@ public static class Builder {
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
private boolean isClusteringConfigSet = false;
private boolean isOptimizeConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.optimize;

import java.nio.charset.Charset;

public class ZOrderingUtil {

/**
* Lexicographically compare two arrays.
* copy from hbase
* @param buffer1 left operand
* @param buffer2 right operand
* @param offset1 Where to start comparing in the left buffer
* @param offset2 Where to start comparing in the right buffer
* @param length1 How much to compare from the left buffer
* @param length2 How much to compare from the right buffer
* @return 0 if equal, < 0 if left is less than right, etc.
*/
public static int compareTo(byte[] buffer1, int offset1, int length1,
byte[] buffer2, int offset2, int length2) {
// Short circuit equal case
if (buffer1 == buffer2
&& offset1 == offset2
&& length1 == length2) {
return 0;
}
// Bring WritableComparator code local
int end1 = offset1 + length1;
int end2 = offset2 + length2;
for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
int a = (buffer1[i] & 0xff);
int b = (buffer2[j] & 0xff);
if (a != b) {
return a - b;
}
}
return length1 - length2;
}

public static byte[] paddingTo8Byte(byte[] a) {
if (a.length == 8) {
return a;
}
if (a.length > 8) {
byte[] result = new byte[8];
System.arraycopy(a, 0, result, 0, 8);
return result;
}
int paddingSize = 8 - a.length;
byte[] result = new byte[8];
for (int i = 0; i < paddingSize; i++) {
result[i] = 0;
}
System.arraycopy(a, 0, result, paddingSize, a.length);

return result;
}

/**
* Interleaving array bytes.
* Interleaving means take one bit from the first matrix element, one bit
* from the next, etc, then take the second bit from the first matrix
* element, second bit from the second, all the way to the last bit of the
* last element. Combine those bits in that order into a single BigInteger,
* @param buffer candidate element to do interleaving
* @return byte size of candidate element
*/
public static byte[] interleaving(byte[][] buffer, int size) {
int candidateSize = buffer.length;
byte[] result = new byte[size * candidateSize];
int resBitPos = 0;
int totalBits = size * 8;
for (int bitStep = 0; bitStep < totalBits; bitStep++) {
int currentBytePos = (int) Math.floor(bitStep / 8);
int currentBitPos = bitStep % 8;

for (int i = 0; i < candidateSize; i++) {
int tempResBytePos = (int) Math.floor(resBitPos / 8);
int tempResBitPos = resBitPos % 8;
result[tempResBytePos] = updatePos(result[tempResBytePos], tempResBitPos, buffer[i][currentBytePos], currentBitPos);
resBitPos++;
}
}
return result;
}

public static byte updatePos(byte a, int apos, byte b, int bpos) {
byte temp = (byte) (b & (1 << (7 - bpos)));
if (apos < bpos) {
temp = (byte) (temp << (bpos - apos));
}
if (apos > bpos) {
temp = (byte) (temp >> (apos - bpos));
}
byte atemp = (byte) (a & (1 << (7 - apos)));
if ((byte) (atemp ^ temp) == 0) {
return a;
}
return (byte) (a ^ (1 << (7 - apos)));
}

public static byte[] toBytes(int val) {
byte[] b = new byte[4];
for (int i = 3; i > 0; i--) {
b[i] = (byte) val;
val >>>= 8;
}
b[0] = (byte) val;
return b;
}

public static byte[] toBytes(long val) {
long temp = val;
byte[] b = new byte[8];
for (int i = 7; i > 0; i--) {
b[i] = (byte) temp;
temp >>>= 8;
}
b[0] = (byte) temp;
return b;
}

public static byte[] toBytes(final double d) {
return toBytes(Double.doubleToRawLongBits(d));
}

public static byte[] intTo8Byte(int a) {
int temp = a;
temp = temp ^ (1 << 31);
return paddingTo8Byte(toBytes(temp));
}

public static byte[] byteTo8Byte(byte a) {
return paddingTo8Byte(new byte[] { a });
}

public static byte[] longTo8Byte(long a) {
long temp = a;
temp = temp ^ (1L << 63);
return toBytes(temp);
}

public static byte[] doubleTo8Byte(double a) {
byte[] temp = toBytes(a);
if (a > 0) {
temp[0] = (byte) (temp[0] ^ (1 << 7));
}
if (a < 0) {
for (int i = 0; i < temp.length; i++) {
temp[i] = (byte) ~temp[i];
}
}
return temp;
}

public static byte[] utf8To8Byte(String a) {
return paddingTo8Byte(a.getBytes(Charset.forName("utf-8")));
}

public static Long convertStringToLong(String a) {
byte[] bytes = utf8To8Byte(a);
long temp = 0L;
for (int i = 7; i >= 0; i--) {
temp = temp | (((long)bytes[i] & 0xff) << (7 - i) * 8);
}
return temp;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ public abstract HoodieWriteMetadata<O> bulkInsertPrepped(HoodieEngineContext con
*/
public abstract HoodieWriteMetadata<O> insertOverwriteTable(HoodieEngineContext context, String instantTime, I records);

/**
* update statistics info for current table.
* to do adaptation, once RFC-27 is finished.
*
* @param context HoodieEngineContext
* @param instantTime Instant time for the replace action
* @param isOptimizeOperation whether current operation is OPTIMIZE type
*/
public abstract void updateStatistics(HoodieEngineContext context, List<HoodieWriteStat> stats, String instantTime, Boolean isOptimizeOperation);

public HoodieWriteConfig getConfig() {
return config;
}
Expand Down
Loading