Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* This class is used to compute a deterministic key for a record based on the contents of the field. Unlike the other KeyGenerators in Hudi, this class does not take in any field names as args to
* create a "keyless" experience for insert only workloads. The keys are guaranteed to be deterministic but not unique, so they can only be used for insert workflows with deduplication disabled.
* The class attempts to get sufficient uniqueness for keys to prevent frequent collisions by choosing the fields it uses in order of decreasing likelihood for uniqueness. The ordering is:
* <ul>
* <li>timestamp</li>
* <li>numeric values</li>
* <li>string, byte arrays, other types not mentioned</li>
* <li>date, lists, maps, booleans</li>
* </ul>
* The number of fields is capped to created predictable performance and the generator only uses non-null values to help increase uniqueness for sparse datasets.
*/
public class KeylessKeyGenerator extends CustomAvroKeyGenerator {
private static final String HOODIE_PREFIX = "_hoodie";
private static final String DOT = ".";
private final int maxFieldsToConsider;
private final int numFieldsForKey;
private final Set<String> partitionFieldNames;
private int[][] fieldOrdering;

public KeylessKeyGenerator(TypedProperties props) {
super(props);
this.numFieldsForKey = props.getInteger(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.defaultValue());
// cap the number of fields to order in case of large schemas
this.maxFieldsToConsider = numFieldsForKey * 3;
this.partitionFieldNames = this.getPartitionPathFields().stream().map(field -> field.split(SPLIT_REGEX)[0]).collect(Collectors.toSet());
}

@Override
public String getRecordKey(GenericRecord record) {
return buildKey(getFieldOrdering(record), record);
}

int[][] getFieldOrdering(GenericRecord genericRecord) {
if (fieldOrdering == null) {
fieldOrdering = buildFieldOrdering(genericRecord.getSchema().getFields());
}
return fieldOrdering;
}

/**
* Deterministically builds a key for the input value based on the provided fieldOrdering. The first {@link #numFieldsForKey} non-null values will be used to generate a string that is passed to
* {@link UUID#nameUUIDFromBytes(byte[])}.
* @param fieldOrdering an array of integer arrays. The integer arrays represent paths to a single field within the input object.
* @param input the input object that needs a key
* @return a deterministically generated {@link UUID}
* @param <T> the input object type
*/
private <T> String buildKey(int[][] fieldOrdering, GenericRecord input) {
StringBuilder key = new StringBuilder();
int nonNullFields = 0;
for (int[] index : fieldOrdering) {
Object value = getFieldForRecord(input, index);
if (value == null) {
continue;
}
nonNullFields++;
key.append(value.hashCode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@the-other-tim-brown @the-other-tim-brown this is incorrect way of hash/key generation, we can't distinguish b/w cases of hash_1=12 and hash_1=1, hash_2=2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should append some sort of delimiter after each hashcode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

if (nonNullFields >= numFieldsForKey) {
break;
}
}
return UUID.nameUUIDFromBytes(key.toString().getBytes(StandardCharsets.UTF_8)).toString();
}

/**
* Gets the value of the field at the specified path within the record.
* @param record the input record
* @param fieldPath the path to the field as an array of integers representing the field position within the object
* @return value at the path
*/
private static Object getFieldForRecord(GenericRecord record, int[] fieldPath) {
Object value = record;
for (Integer index : fieldPath) {
if (value == null) {
return null;
}
value = ((GenericRecord) value).get(index);
}
return value;
}

private int[][] buildFieldOrdering(List<Schema.Field> initialFields) {
PriorityQueue<Pair<int[], Integer>> queue = new PriorityQueue<>(maxFieldsToConsider + 1, RankingComparator.getInstance());
Queue<FieldToProcess> fieldsToProcess = new ArrayDeque<>();
for (int j = 0; j < initialFields.size(); j++) {
fieldsToProcess.offer(new FieldToProcess(new int[]{j}, initialFields.get(j), initialFields.get(j).name()));
}
while (!fieldsToProcess.isEmpty()) {
FieldToProcess fieldToProcess = fieldsToProcess.poll();
int[] existingPath = fieldToProcess.getIndexPath();
Schema fieldSchema = fieldToProcess.getField().schema();
if (fieldSchema.getType() == Schema.Type.UNION) {
fieldSchema = fieldSchema.getTypes().get(1);
}
if (fieldSchema.getType() == Schema.Type.RECORD) {
List<Schema.Field> nestedFields = fieldSchema.getFields();
for (int i = 0; i < nestedFields.size(); i++) {
int[] path = Arrays.copyOf(existingPath, existingPath.length + 1);
path[existingPath.length] = i;
Schema.Field nestedField = nestedFields.get(i);
fieldsToProcess.add(new FieldToProcess(path, nestedField, fieldToProcess.getNamePath() + DOT + nestedField.name()));
}
} else {
// check that field is not used in partitioning
if (!partitionFieldNames.contains(fieldToProcess.getNamePath())) {
queue.offer(Pair.of(existingPath, getSchemaRanking(fieldToProcess.getField())));
if (queue.size() > maxFieldsToConsider) {
queue.poll();
}
}
}
}
Pair<int[], Integer>[] sortedPairs = queue.toArray(new Pair[queue.size()]);
Arrays.sort(sortedPairs, RankingComparator.getInstance().reversed());
int[][] output = new int[sortedPairs.length][];
for (int k = 0; k < sortedPairs.length; k++) {
output[k] = sortedPairs[k].getLeft();
}
return output;
}

private static class FieldToProcess {
final int[] indexPath;
final Schema.Field field;
final String namePath;

public FieldToProcess(int[] indexPath, Schema.Field field, String namePath) {
this.indexPath = indexPath;
this.field = field;
this.namePath = namePath;
}

public int[] getIndexPath() {
return indexPath;
}

public Schema.Field getField() {
return field;
}

public String getNamePath() {
return namePath;
}
}

/**
* Ranks the fields by their type.
* @param field input field
* @return a score of 0 to 4
*/
private int getSchemaRanking(Schema.Field field) {
if (field.name().startsWith(HOODIE_PREFIX)) {
return 0;
}
Schema schema = field.schema();
if (schema.getType() == Schema.Type.UNION) {
schema = schema.getTypes().get(0).getType() == Schema.Type.NULL ? schema.getTypes().get(1) : schema.getTypes().get(0);
}
Schema.Type type = schema.getType();
switch (type) {
case LONG:
// assumes long with logical type will be a timestamp
return schema.getLogicalType() != null ? 4 : 3;
case INT:
// assumes long with logical type will be a date which will have low variance in a batch
return schema.getLogicalType() != null ? 1 : 3;
case DOUBLE:
case FLOAT:
return 3;
case BOOLEAN:
case MAP:
case ARRAY:
return 1;
default:
return 2;
}
}

private static class RankingComparator implements Comparator<Pair<int[], Integer>> {
private static final RankingComparator INSTANCE = new RankingComparator();

static RankingComparator getInstance() {
return INSTANCE;
}

@Override
public int compare(Pair<int[], Integer> o1, Pair<int[], Integer> o2) {
int initialResult = o1.getRight().compareTo(o2.getRight());
if (initialResult == 0) {
// favor the smaller list (less nested value) on ties
int sizeResult = Integer.compare(o2.getLeft().length, o1.getLeft().length);
if (sizeResult == 0) {
return Integer.compare(o2.getLeft()[0], o1.getLeft()[0]);
}
return sizeResult;
}
return initialResult;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;

public class TestKeylessKeyGenerator {
private static final long TIME = 1672265446090L;
private static final Schema SCHEMA;

static {
try {
SCHEMA = new Schema.Parser().parse(TestKeylessKeyGenerator.class.getClassLoader().getResourceAsStream("keyless_schema.avsc"));
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}

@Test
public void createKeyWithoutPartitionColumn() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("952f0fd4-17b6-3762-b0ea-aa76d36377f1", actualForRecord);
}

@Test
public void createKeyWithPartition() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("integer_field:SIMPLE,partition_field:SIMPLE,nested_struct.doubly_nested:SIMPLE", 3));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("5c1f9cac-c45d-3b57-9bf7-f745a4bb35c4", actualForRecord);
}

@Test
public void nullFieldsProperlyHandled() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record = createRecord("partition1", "value1", null, null, null, null);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("22dee533-e64f-3694-8242-5ec5f25e6d11", actualForRecord);
}

@Test
public void assertOnlySubsetOfFieldsUsed() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 3));
GenericRecord record1 = createRecord("partition1", "value1", 123, 456L, TIME, null);
String actualForRecord1 = keyGenerator.getRecordKey(record1);
GenericRecord record2 = createRecord("partition2", "value2", 123, 456L, TIME, null);
String actualForRecord2 = keyGenerator.getRecordKey(record2);
Assertions.assertEquals(actualForRecord2, actualForRecord1);
}

@Test
public void numFieldsImpactsKeyGen() {
KeylessKeyGenerator keyGenerator1 = new KeylessKeyGenerator(getKeyGenProperties("", 3));
KeylessKeyGenerator keyGenerator2 = new KeylessKeyGenerator(getKeyGenProperties("", 10));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, null);
Assertions.assertNotEquals(keyGenerator1.getRecordKey(record), keyGenerator2.getRecordKey(record));
}

@Test
public void nestedColumnsUsed() {
KeylessKeyGenerator keyGenerator = new KeylessKeyGenerator(getKeyGenProperties("", 10));
GenericRecord record = createRecord("partition1", "value1", 123, 456L, TIME, 20.1);
String actualForRecord = keyGenerator.getRecordKey(record);
Assertions.assertEquals("6bbd8811-6ea1-3ef1-840c-f7a51d8f378c", actualForRecord);
}

protected GenericRecord createRecord(String partitionField, String stringValue, Integer integerValue, Long longValue, Long timestampValue, Double nestedDouble) {
GenericRecord nestedRecord = null;
if (nestedDouble != null) {
nestedRecord = new GenericRecordBuilder(SCHEMA.getField("nested_struct").schema().getTypes().get(1))
.set("doubly_nested", nestedDouble)
.build();
}

return new GenericRecordBuilder(SCHEMA)
.set("partition_field", partitionField)
.set("string_field", stringValue)
.set("integer_field", integerValue)
.set("long_field", longValue)
.set("timestamp_field", timestampValue)
.set("nested_struct", nestedRecord)
.build();
}

protected TypedProperties getKeyGenProperties(String partitionPathField, int numFieldsInKeyGen) {
TypedProperties properties = new TypedProperties();
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPathField);
properties.put(KeyGeneratorOptions.NUM_FIELDS_IN_KEYLESS_GENERATOR.key(), numFieldsInKeyGen);
properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "");
return properties;
}
}
Loading