Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.exception;

/**
* Exception thrown for any higher level errors when {@link org.apache.hudi.keygen.KeyGeneratorInterface} is generating
* a {@link org.apache.hudi.common.model.HoodieKey}.
*/
public class HoodieKeyGeneratorException extends HoodieException {

public HoodieKeyGeneratorException(String msg, Throwable e) {
super(msg, e);
}

public HoodieKeyGeneratorException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.util.List;
import java.util.stream.Collectors;

public abstract class BaseKeyGenerator extends KeyGenerator {

protected List<String> recordKeyFields;
protected List<String> partitionPathFields;
protected final boolean encodePartitionPath;
protected final boolean hiveStylePartitioning;

protected BaseKeyGenerator(TypedProperties config) {
super(config);
this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY,
Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL));
this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,
Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL));
}

/**
* Generate a record Key out of provided generic record.
*/
public abstract String getRecordKey(GenericRecord record);

/**
* Generate a partition path out of provided generic record.
*/
public abstract String getPartitionPath(GenericRecord record);

/**
* Generate a Hoodie Key out of provided generic record.
*/
@Override
public final HoodieKey getKey(GenericRecord record) {
if (getRecordKeyFields() == null || getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg");
}
return new HoodieKey(getRecordKey(record), getPartitionPath(record));
}

@Override
public final List<String> getRecordKeyFieldNames() {
// For nested columns, pick top level column name
return getRecordKeyFields().stream().map(k -> {
int idx = k.indexOf('.');
return idx > 0 ? k.substring(0, idx) : k;
}).collect(Collectors.toList());
}

public List<String> getRecordKeyFields() {
return recordKeyFields;
}

public List<String> getPartitionPathFields() {
return partitionPathFields;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs.
*/
public class ComplexAvroKeyGenerator extends BaseKeyGenerator {
public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":";

public ComplexAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)
.split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList());
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
}

@Override
public String getPartitionPath(GenericRecord record) {
return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieKeyGeneratorException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.io.IOException;
import java.util.Arrays;
import java.util.stream.Collectors;

/**
* This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. Similarly partition path can be configured to have multiple
* fields or only one field. This class expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. For example:
* <p>
* properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2").
* <p>
* The complete partition path is created as <value for field1 basis PartitionKeyType1>/<value for field2 basis PartitionKeyType2> and so on.
* <p>
* Few points to consider: 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased 2. If you simply want to have the value of your configured
* field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank.
* <p>
* RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator.
*/
public class CustomAvroKeyGenerator extends BaseKeyGenerator {

private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
private static final String SPLIT_REGEX = ":";

/**
* Used as a part of config in CustomKeyGenerator.java.
*/
public enum PartitionKeyType {
SIMPLE, TIMESTAMP
}

public CustomAvroKeyGenerator(TypedProperties props) {
super(props);
this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList());
}

@Override
public String getPartitionPath(GenericRecord record) {
if (getPartitionPathFields() == null) {
throw new HoodieKeyException("Unable to find field names for partition path in cfg");
}

String partitionPathField;
StringBuilder partitionPath = new StringBuilder();

//Corresponds to no partition case
if (getPartitionPathFields().size() == 1 && getPartitionPathFields().get(0).isEmpty()) {
return "";
}
for (String field : getPartitionPathFields()) {
String[] fieldWithType = field.split(SPLIT_REGEX);
if (fieldWithType.length != 2) {
throw new HoodieKeyException("Unable to find field names for partition path in proper format");
}

partitionPathField = fieldWithType[0];
PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase());
switch (keyType) {
case SIMPLE:
partitionPath.append(new SimpleAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
break;
case TIMESTAMP:
try {
partitionPath.append(new TimestampBasedAvroKeyGenerator(config, partitionPathField).getPartitionPath(record));
} catch (IOException e) {
throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class");
}
break;
default:
throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType);
}
partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
}
partitionPath.deleteCharAt(partitionPath.length() - 1);
return partitionPath.toString();
}

@Override
public String getRecordKey(GenericRecord record) {
validateRecordKeyFields();
return getRecordKeyFields().size() == 1
? new SimpleAvroKeyGenerator(config).getRecordKey(record)
: new ComplexAvroKeyGenerator(config).getRecordKey(record);
}

private void validateRecordKeyFields() {
if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) {
throw new HoodieKeyException("Unable to find field names for record key in cfg");
}
}

public String getDefaultPartitionPathSeparator() {
return DEFAULT_PARTITION_PATH_SEPARATOR;
}

public String getSplitRegex() {
return SPLIT_REGEX;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.keygen;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Avro Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator
* avoids using partition value for generating HoodieKey.
*/
public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator {

private static final String EMPTY_PARTITION = "";

public GlobalAvroDeleteKeyGenerator(TypedProperties config) {
super(config);
this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(","));
}

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, getRecordKeyFields());
}

@Override
public String getPartitionPath(GenericRecord record) {
return EMPTY_PARTITION;
}

@Override
public List<String> getPartitionPathFields() {
return new ArrayList<>();
}

public String getEmptyPartition() {
return EMPTY_PARTITION;
}
}
Loading