Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sort.SpaceCurveSortingHelper;
Expand Down Expand Up @@ -62,16 +61,12 @@ public RDDSpatialCurveOptimizationSortPartitioner(HoodieSparkEngineContext spark

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
String payloadClass = config.getPayloadClass();
// do sort
JavaRDD<GenericRecord> preparedRecord = prepareGenericRecord(records, outputSparkPartitions, serializableSchema.get());
return preparedRecord.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
HoodieKey hoodieKey = new HoodieKey(key, partition);
HoodieRecordPayload avroPayload = ReflectionUtils.loadPayload(payloadClass,
new Object[] {Option.of(record)}, Option.class);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, avroPayload);
HoodieRecord hoodieRecord = new HoodieRecord(hoodieKey, new RewriteAvroPayload(record));
return hoodieRecord;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@

package org.apache.hudi.sort;

import org.apache.hudi.common.util.BinaryUtil;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.optimize.HilbertCurveUtils;
import org.apache.hudi.common.util.BinaryUtil;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.hudi.execution.ByteArraySorting;
import org.apache.spark.sql.hudi.execution.RangeSampleSort$;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType;
Expand All @@ -51,8 +51,9 @@
import org.apache.spark.sql.types.TimestampType;
import org.davidmoten.hilbert.HilbertCurve;
import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray;

import java.util.ArrayList;
import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -126,7 +127,7 @@ public static Dataset<Row> orderDataFrameByMappingValues(
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new IllegalArgumentException(String.format("new only support z-order/hilbert optimize but find: %s", layoutOptStrategy));
throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));
}

// Compose new {@code StructType} for ordered RDDs
Expand All @@ -148,51 +149,24 @@ private static StructType composeOrderedRDDStructType(StructType schema) {

private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
List<byte[]> zBytesList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return BinaryUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
byte[][] zBytes = new byte[zBytesList.size()][];
for (int i = 0; i < zBytesList.size(); i++) {
zBytes[i] = zBytesList.get(i);
}
List<Object> zVaules = new ArrayList<>();
zVaules.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
zVaules.add(BinaryUtil.interleaving(zBytes, 8));
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(zVaules));
byte[][] zBytes = fieldMap.entrySet().stream()
.map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueTo8Bytes(row, index, field.dataType());
})
.toArray(byte[][]::new);

// Interleave received bytes to produce Z-curve ordinal
byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);
return appendToRow(row, zOrdinalBytes);
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}

private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
// NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized
// only once per partition
return originRDD.mapPartitions(rows -> {
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
Expand All @@ -205,48 +179,91 @@ public boolean hasNext() {
@Override
public Row next() {
Row row = rows.next();
List<Long> longList = fieldMap.entrySet().stream().map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
DataType dataType = field.dataType();
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long)row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index));
}
return null;
}).filter(f -> f != null).collect(Collectors.toList());
long[] longs = fieldMap.entrySet().stream()
.mapToLong(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueToLong(row, index, field.dataType());
})
.toArray();

byte[] hilbertValue = HilbertCurveUtils.indexBytes(
hilbertCurve, longList.stream().mapToLong(l -> l).toArray(), 63);
List<Object> values = new ArrayList<>();
values.addAll(scala.collection.JavaConverters.bufferAsJavaListConverter(row.toSeq().toBuffer()).asJava());
values.add(hilbertValue);
return Row$.MODULE$.apply(JavaConversions.asScalaBuffer(values));
// Map N-dimensional coordinates into position on the Hilbert curve
byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
return appendToRow(row, hilbertCurvePosBytes);
}
};
}).sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}

private static Row appendToRow(Row row, Object value) {
// NOTE: This is an ugly hack to avoid array re-allocation --
// Spark's {@code Row#toSeq} returns array of Objects
Object[] currentValues = (Object[]) ((WrappedArray<Object>) row.toSeq()).array();
return RowFactory.create(CollectionUtils.append(currentValues, value));
}

@Nonnull
private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) {
if (dataType instanceof LongType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index));
} else if (dataType instanceof DoubleType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index));
} else if (dataType instanceof FloatType) {
return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index));
} else if (dataType instanceof StringType) {
return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index));
} else if (dataType instanceof DateType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime());
} else if (dataType instanceof TimestampType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime());
} else if (dataType instanceof ByteType) {
return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index));
} else if (dataType instanceof ShortType) {
return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index));
} else if (dataType instanceof DecimalType) {
return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue());
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return BinaryUtil.intTo8Byte(value ? 1 : 0);
} else if (dataType instanceof BinaryType) {
return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index));
}

throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName()));
}

private static long mapColumnValueToLong(Row row, int index, DataType dataType) {
if (dataType instanceof LongType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index);
} else if (dataType instanceof DoubleType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits(row.getDouble(index));
} else if (dataType instanceof IntegerType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getInt(index);
} else if (dataType instanceof FloatType) {
return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index));
} else if (dataType instanceof StringType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index));
} else if (dataType instanceof DateType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime();
} else if (dataType instanceof TimestampType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime();
} else if (dataType instanceof ByteType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)});
} else if (dataType instanceof ShortType) {
return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index);
} else if (dataType instanceof DecimalType) {
return row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue();
} else if (dataType instanceof BooleanType) {
boolean value = row.isNullAt(index) ? false : row.getBoolean(index);
return value ? Long.MAX_VALUE : 0;
} else if (dataType instanceof BinaryType) {
return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index));
}

throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName()));
}

public static Dataset<Row> orderDataFrameBySamplingValues(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ public static Object getRecordColumnValues(HoodieRecord<? extends HoodieRecordPa
if (columns.length == 1) {
return HoodieAvroUtils.getNestedFieldVal(genericRecord, columns[0], true);
} else {
// TODO this is inefficient, instead we can simply return array of Comparable
StringBuilder sb = new StringBuilder();
for (String col : columns) {
sb.append(HoodieAvroUtils.getNestedFieldValAsString(genericRecord, col, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hudi.hadoop.FileNameCachingPath;

import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -141,7 +142,7 @@ private static Path convertPathWithScheme(Path oldPath, String newScheme) {
try {
newURI = new URI(newScheme, oldURI.getUserInfo(), oldURI.getHost(), oldURI.getPort(), oldURI.getPath(),
oldURI.getQuery(), oldURI.getFragment());
return new Path(newURI);
return new FileNameCachingPath(newURI);
} catch (URISyntaxException e) {
// TODO - Better Exception handling
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.util.collection.Pair;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,11 +38,35 @@ public class CollectionUtils {

public static final Properties EMPTY_PROPERTIES = new Properties();

/**
* Combines provided arrays into one
*/
@SuppressWarnings("unchecked")
public static <T> T[] combine(T[] one, T[] another) {
T[] combined = (T[]) Array.newInstance(one.getClass().getComponentType(), one.length + another.length);
System.arraycopy(one, 0, combined, 0, one.length);
System.arraycopy(another, 0, combined, one.length, another.length);
return combined;
}

/**
* Combines provided array and an element into a new array
*/
@SuppressWarnings("unchecked")
public static <T> T[] append(T[] array, T elem) {
T[] combined = (T[]) Array.newInstance(array.getClass().getComponentType(), array.length + 1);
System.arraycopy(array, 0, combined, 0, array.length);
combined[array.length] = elem;
return combined;
}


/**
* Combines provided {@link List}s into one
*/
public static <E> List<E> combine(List<E> one, List<E> another) {
ArrayList<E> combined = new ArrayList<>(one);
ArrayList<E> combined = new ArrayList<>(one.size() + another.size());
combined.addAll(one);
combined.addAll(another);
return combined;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static long getObjectSize(Object obj) throws UnsupportedOperationExceptio
private final Map<Class<?>, ClassSizeInfo> classSizeInfos = new IdentityHashMap<>();

private final Set<Object> alreadyVisited = Collections.newSetFromMap(new IdentityHashMap<>());
private final Deque<Object> pending = new ArrayDeque<>(16 * 1024);
private final Deque<Object> pending = new ArrayDeque<>(64);
private long size;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.hadoop;

import org.apache.hadoop.fs.Path;

import java.net.URI;

/**
* NOTE: This class is thread-safe
*/
public class FileNameCachingPath extends Path {

// NOTE: volatile keyword is redundant here and put mostly for reader notice, since all
// reads/writes to references are always atomic (including 64-bit JVMs)
// https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.7
private volatile String fileName;

public FileNameCachingPath(URI aUri) {
super(aUri);
}

@Override
public String getName() {
// This value could be overwritten concurrently and that's okay, since
// {@code Path} is immutable
if (fileName == null) {
fileName = super.getName();
}
return fileName;
}
}