diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 902de58361d88..487a2e2b3ea8f 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -50,6 +50,11 @@ ${project.version} + + joda-time + joda-time + + log4j diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java new file mode 100644 index 0000000000000..5bf06e2e46bdb --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieKeyGeneratorException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + * Exception thrown for any higher level errors when {@link org.apache.hudi.keygen.KeyGeneratorInterface} is generating + * a {@link org.apache.hudi.common.model.HoodieKey}. + */ +public class HoodieKeyGeneratorException extends HoodieException { + + public HoodieKeyGeneratorException(String msg, Throwable e) { + super(msg, e); + } + + public HoodieKeyGeneratorException(String msg) { + super(msg); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java new file mode 100644 index 0000000000000..8020be8ab720b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/BaseKeyGenerator.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.List; +import java.util.stream.Collectors; + +public abstract class BaseKeyGenerator extends KeyGenerator { + + protected List recordKeyFields; + protected List partitionPathFields; + protected final boolean encodePartitionPath; + protected final boolean hiveStylePartitioning; + + protected BaseKeyGenerator(TypedProperties config) { + super(config); + this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY, + Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL)); + this.hiveStylePartitioning = config.getBoolean(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, + Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL)); + } + + /** + * Generate a record Key out of provided generic record. + */ + public abstract String getRecordKey(GenericRecord record); + + /** + * Generate a partition path out of provided generic record. + */ + public abstract String getPartitionPath(GenericRecord record); + + /** + * Generate a Hoodie Key out of provided generic record. + */ + @Override + public final HoodieKey getKey(GenericRecord record) { + if (getRecordKeyFields() == null || getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + } + return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + } + + @Override + public final List getRecordKeyFieldNames() { + // For nested columns, pick top level column name + return getRecordKeyFields().stream().map(k -> { + int idx = k.indexOf('.'); + return idx > 0 ? k.substring(0, idx) : k; + }).collect(Collectors.toList()); + } + + public List getRecordKeyFields() { + return recordKeyFields; + } + + public List getPartitionPathFields() { + return partitionPathFields; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java new file mode 100644 index 0000000000000..edc1ad9cebc15 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/ComplexAvroKeyGenerator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Avro complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. + */ +public class ComplexAvroKeyGenerator extends BaseKeyGenerator { + public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":"; + + public ComplexAvroKeyGenerator(TypedProperties props) { + super(props); + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY) + .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY) + .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java new file mode 100644 index 0000000000000..6266fd15c6b84 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * This is a generic implementation of KeyGenerator where users can configure record key as a single field or a combination of fields. Similarly partition path can be configured to have multiple + * fields or only one field. This class expects value for prop "hoodie.datasource.write.partitionpath.field" in a specific format. For example: + *

+ * properties.put("hoodie.datasource.write.partitionpath.field", "field1:PartitionKeyType1,field2:PartitionKeyType2"). + *

+ * The complete partition path is created as / and so on. + *

+ * Few points to consider: 1. If you want to customize some partition path field on a timestamp basis, you can use field1:timestampBased 2. If you simply want to have the value of your configured + * field in the partition path, use field1:simple 3. If you want your table to be non partitioned, simply leave it as blank. + *

+ * RecordKey is internally generated using either SimpleKeyGenerator or ComplexKeyGenerator. + */ +public class CustomAvroKeyGenerator extends BaseKeyGenerator { + + private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + private static final String SPLIT_REGEX = ":"; + + /** + * Used as a part of config in CustomKeyGenerator.java. + */ + public enum PartitionKeyType { + SIMPLE, TIMESTAMP + } + + public CustomAvroKeyGenerator(TypedProperties props) { + super(props); + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList()); + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList()); + } + + @Override + public String getPartitionPath(GenericRecord record) { + if (getPartitionPathFields() == null) { + throw new HoodieKeyException("Unable to find field names for partition path in cfg"); + } + + String partitionPathField; + StringBuilder partitionPath = new StringBuilder(); + + //Corresponds to no partition case + if (getPartitionPathFields().size() == 1 && getPartitionPathFields().get(0).isEmpty()) { + return ""; + } + for (String field : getPartitionPathFields()) { + String[] fieldWithType = field.split(SPLIT_REGEX); + if (fieldWithType.length != 2) { + throw new HoodieKeyException("Unable to find field names for partition path in proper format"); + } + + partitionPathField = fieldWithType[0]; + PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()); + switch (keyType) { + case SIMPLE: + partitionPath.append(new SimpleAvroKeyGenerator(config, partitionPathField).getPartitionPath(record)); + break; + case TIMESTAMP: + try { + partitionPath.append(new TimestampBasedAvroKeyGenerator(config, partitionPathField).getPartitionPath(record)); + } catch (IOException e) { + throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class"); + } + break; + default: + throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType); + } + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return partitionPath.toString(); + } + + @Override + public String getRecordKey(GenericRecord record) { + validateRecordKeyFields(); + return getRecordKeyFields().size() == 1 + ? new SimpleAvroKeyGenerator(config).getRecordKey(record) + : new ComplexAvroKeyGenerator(config).getRecordKey(record); + } + + private void validateRecordKeyFields() { + if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) { + throw new HoodieKeyException("Unable to find field names for record key in cfg"); + } + } + + public String getDefaultPartitionPathSeparator() { + return DEFAULT_PARTITION_PATH_SEPARATOR; + } + + public String getSplitRegex() { + return SPLIT_REGEX; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java new file mode 100644 index 0000000000000..b074a25450ab6 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/GlobalAvroDeleteKeyGenerator.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Avro Key generator for deletes using global indices. Global index deletes do not require partition value so this key generator + * avoids using partition value for generating HoodieKey. + */ +public class GlobalAvroDeleteKeyGenerator extends BaseKeyGenerator { + + private static final String EMPTY_PARTITION = ""; + + public GlobalAvroDeleteKeyGenerator(TypedProperties config) { + super(config); + this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")); + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return EMPTY_PARTITION; + } + + @Override + public List getPartitionPathFields() { + return new ArrayList<>(); + } + + public String getEmptyPartition() { + return EMPTY_PARTITION; + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java similarity index 82% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java index 8e9700b27280c..1f59bab266adb 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java @@ -18,14 +18,20 @@ package org.apache.hudi.keygen; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.util.List; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.List; public class KeyGenUtils { @@ -111,4 +117,23 @@ public static String getPartitionPath(GenericRecord record, String partitionPath } return partitionPath; } + + /** + * Create a date time parser class for TimestampBasedKeyGenerator, passing in any configs needed. + */ + public static AbstractHoodieDateTimeParser createDateTimeParser(TypedProperties props, String parserClass) throws IOException { + try { + return (AbstractHoodieDateTimeParser) ReflectionUtils.loadClass(parserClass, props); + } catch (Throwable e) { + throw new IOException("Could not load date time parser class " + parserClass, e); + } + } + + public static void checkRequiredProperties(TypedProperties props, List checkPropNames) { + checkPropNames.forEach(prop -> { + if (!props.containsKey(prop)) { + throw new HoodieNotSupportedException("Required property " + prop + " is missing"); + } + }); + } } \ No newline at end of file diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java similarity index 57% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java index 3b1db0857a929..8c3f794ee6fa8 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/KeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/KeyGenerator.java @@ -19,30 +19,22 @@ package org.apache.hudi.keygen; import org.apache.hudi.ApiMaturityLevel; -import org.apache.hudi.AvroConversionHelper; import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.avro.generic.GenericRecord; -import org.apache.spark.sql.Row; -import scala.Function1; -import java.io.Serializable; import java.util.List; /** * Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record. */ @PublicAPIClass(maturity = ApiMaturityLevel.STABLE) -public abstract class KeyGenerator implements Serializable, SparkKeyGeneratorInterface { - - private static final String STRUCT_NAME = "hoodieRowTopLevelField"; - private static final String NAMESPACE = "hoodieRow"; +public abstract class KeyGenerator implements KeyGeneratorInterface { protected TypedProperties config; - private transient Function1 converterFn = null; protected KeyGenerator(TypedProperties config) { this.config = config; @@ -64,32 +56,4 @@ public List getRecordKeyFieldNames() { throw new UnsupportedOperationException("Bootstrap not supported for key generator. " + "Please override this method in your custom key generator."); } - - /** - * Fetch record key from {@link Row}. - * @param row instance of {@link Row} from which record key is requested. - * @return the record key of interest from {@link Row}. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - public String getRecordKey(Row row) { - if (null == converterFn) { - converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE); - } - GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); - return getKey(genericRecord).getRecordKey(); - } - - /** - * Fetch partition path from {@link Row}. - * @param row instance of {@link Row} from which partition path is requested - * @return the partition path of interest from {@link Row}. - */ - @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - public String getPartitionPath(Row row) { - if (null == converterFn) { - converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE); - } - GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); - return getKey(genericRecord).getPartitionPath(); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java new file mode 100644 index 0000000000000..a5272b38bbff3 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/NonpartitionedAvroKeyGenerator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; + +import java.util.ArrayList; +import java.util.List; + +/** + * Avro simple Key generator for unpartitioned Hive Tables. + */ +public class NonpartitionedAvroKeyGenerator extends SimpleAvroKeyGenerator { + + private static final String EMPTY_PARTITION = ""; + private static final List EMPTY_PARTITION_FIELD_LIST = new ArrayList<>(); + + public NonpartitionedAvroKeyGenerator(TypedProperties props) { + super(props); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return EMPTY_PARTITION; + } + + @Override + public List getPartitionPathFields() { + return EMPTY_PARTITION_FIELD_LIST; + } + + public String getEmptyPartition() { + return EMPTY_PARTITION; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java new file mode 100644 index 0000000000000..59fe6be313030 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/SimpleAvroKeyGenerator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import java.util.Collections; + +/** + * Avro simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. + */ +public class SimpleAvroKeyGenerator extends BaseKeyGenerator { + + public SimpleAvroKeyGenerator(TypedProperties props) { + this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY), + props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)); + } + + SimpleAvroKeyGenerator(TypedProperties props, String partitionPathField) { + this(props, null, partitionPathField); + } + + SimpleAvroKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) { + super(props); + this.recordKeyFields = recordKeyField == null + ? Collections.emptyList() + : Collections.singletonList(recordKeyField); + this.partitionPathFields = Collections.singletonList(partitionPathField); + } + + @Override + public String getRecordKey(GenericRecord record) { + return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0)); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath); + } +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java similarity index 72% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index 97a7d2ef6e05d..28048a16b88da 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,19 +17,16 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.DataSourceWriteOptions; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyGeneratorException; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.parser.AbstractHoodieDateTimeParser; import org.apache.hudi.keygen.parser.HoodieDateTimeParserImpl; - -import org.apache.avro.generic.GenericRecord; -import org.apache.spark.sql.Row; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -46,15 +42,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH; -import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER; -import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER; /** - * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. + * Avro Key generator, that relies on timestamps for partitioning field. Still picks record key by name. */ -public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { - +public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { public enum TimestampType implements Serializable { UNIX_TIMESTAMP, DATE_STRING, MIXED, EPOCHMILLISECONDS, SCALAR } @@ -96,19 +88,19 @@ public static class Config { static final String DATE_TIME_PARSER_PROP = "hoodie.deltastreamer.keygen.datetime.parser.class"; } - public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { - this(config, config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()), - config.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY())); + public TimestampBasedAvroKeyGenerator(TypedProperties config) throws IOException { + this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY), + config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)); } - TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException { + TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException { this(config, null, partitionPathField); } - TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException { + TimestampBasedAvroKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException { super(config, recordKeyField, partitionPathField); String dateTimeParserClass = config.getString(Config.DATE_TIME_PARSER_PROP, HoodieDateTimeParserImpl.class.getName()); - this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass); + this.parser = KeyGenUtils.createDateTimeParser(config, dateTimeParserClass); this.inputDateTimeZone = parser.getInputDateTimeZone(); this.outputDateTimeZone = parser.getOutputDateTimeZone(); this.outputDateFormat = parser.getOutputDateFormat(); @@ -128,8 +120,8 @@ public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { default: timeUnit = null; } - this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(), - Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL())); + this.encodePartitionPath = config.getBoolean(KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY, + Boolean.parseBoolean(KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL)); } @Override @@ -141,14 +133,14 @@ public String getPartitionPath(GenericRecord record) { try { return getPartitionPath(partitionVal); } catch (Exception e) { - throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + partitionVal, e); + throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + partitionVal, e); } } /** * Set default value to partitionVal if the input value of partitionPathField is null. */ - private Object getDefaultPartitionVal() { + public Object getDefaultPartitionVal() { Object result = 1L; if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { // since partitionVal is null, we can set a default value of any format as TIMESTAMP_INPUT_DATE_FORMAT_PROP @@ -191,7 +183,7 @@ private void initIfNeeded() { * @param partitionVal partition path object value fetched from record/row * @return the parsed partition path based on data type */ - private String getPartitionPath(Object partitionVal) { + public String getPartitionPath(Object partitionVal) { initIfNeeded(); long timeMs; if (partitionVal instanceof Double) { @@ -202,7 +194,7 @@ private String getPartitionPath(Object partitionVal) { timeMs = convertLongTimeToMillis((Long) partitionVal); } else if (partitionVal instanceof CharSequence) { if (!inputFormatter.isPresent()) { - throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); + throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!"); } DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString()); if (this.outputDateTimeZone == null) { @@ -235,27 +227,4 @@ private long convertLongTimeToMillis(Long partitionVal) { return MILLISECONDS.convert(partitionVal, timeUnit); } - @Override - public String getRecordKey(Row row) { - buildFieldPositionMapIfNeeded(row.schema()); - return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false); - } - - @Override - public String getPartitionPath(Row row) { - Object fieldVal = null; - buildFieldPositionMapIfNeeded(row.schema()); - Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0))); - try { - if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) - || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { - fieldVal = getDefaultPartitionVal(); - } else { - fieldVal = partitionPathFieldVal; - } - return getPartitionPath(fieldVal); - } catch (Exception e) { - throw new HoodieDeltaStreamerException("Unable to parse input partition field :" + fieldVal, e); - } - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java new file mode 100644 index 0000000000000..da567e078790a --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen.constant; + +public class KeyGeneratorOptions { + + /** + * Flag to indicate whether to use Hive style partitioning. + * If set true, the names of partition folders follow = format. + * By default false (the names of partition folders are only partition values) + */ + public static final String URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode"; + public static final String DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false"; + public static final String HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning"; + public static final String DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false"; + + /** + * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value + * will be obtained by invoking .toString() on the field value. Nested fields can be specified using + * the dot notation eg: `a.b.c` + */ + public static final String RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"; + public static final String PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field"; +} + diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java similarity index 88% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java index 80e26ccb8a5ca..6fb05c30be11a 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/AbstractHoodieDateTimeParser.java @@ -19,7 +19,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.keygen.TimestampBasedKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormatter; @@ -36,7 +36,7 @@ public AbstractHoodieDateTimeParser(TypedProperties config) { } private String initInputDateFormatDelimiter() { - String inputDateFormatDelimiter = config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); + String inputDateFormatDelimiter = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, ",").trim(); inputDateFormatDelimiter = inputDateFormatDelimiter.isEmpty() ? "," : inputDateFormatDelimiter; return inputDateFormatDelimiter; } @@ -45,7 +45,7 @@ private String initInputDateFormatDelimiter() { * Returns the output date format in which the partition paths will be created for the hudi dataset. */ public String getOutputDateFormat() { - return config.getString(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); + return config.getString(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP); } /** diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java similarity index 91% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java rename to hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java index 41452d0047c8f..81960ea168391 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java @@ -17,11 +17,11 @@ package org.apache.hudi.keygen.parser; -import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config; -import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.TimestampType; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config; +import org.apache.hudi.keygen.KeyGenUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -42,7 +42,7 @@ public class HoodieDateTimeParserImpl extends AbstractHoodieDateTimeParser { public HoodieDateTimeParserImpl(TypedProperties config) { super(config); - DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); + KeyGenUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP)); this.inputDateTimeZone = getInputDateTimeZone(); } @@ -79,7 +79,7 @@ public String getOutputDateFormat() { public Option getInputFormatter() { TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP)); if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) { - DataSourceUtils.checkRequiredProperties(config, + KeyGenUtils.checkRequiredProperties(config, Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP)); this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, ""); return Option.of(getInputDateFormatter()); diff --git a/hudi-client/hudi-spark-client/pom.xml b/hudi-client/hudi-spark-client/pom.xml index d99346d089d78..5cc6ad6560b6c 100644 --- a/hudi-client/hudi-spark-client/pom.xml +++ b/hudi-client/hudi-spark-client/pom.xml @@ -31,6 +31,13 @@ jar + + + org.scala-lang + scala-library + ${scala.version} + + org.apache.hudi diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java similarity index 61% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java index 8c973a6ba8284..a0c199138b1a7 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java @@ -18,70 +18,67 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.AvroConversionHelper; +import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.exception.HoodieKeyException; - -import org.apache.avro.generic.GenericRecord; +import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; +import scala.Function1; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** - * Base class for all the built-in key generators. Contains methods structured for + * Base class for the built-in key generators. Contains methods structured for * code reuse amongst them. */ -public abstract class BuiltinKeyGenerator extends KeyGenerator { +public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface { - protected List recordKeyFields; - protected List partitionPathFields; - protected final boolean encodePartitionPath; - protected final boolean hiveStylePartitioning; + private static final String STRUCT_NAME = "hoodieRowTopLevelField"; + private static final String NAMESPACE = "hoodieRow"; + private transient Function1 converterFn = null; + protected StructType structType; protected Map> recordKeyPositions = new HashMap<>(); protected Map> partitionPathPositions = new HashMap<>(); - protected StructType structType; protected BuiltinKeyGenerator(TypedProperties config) { super(config); - this.encodePartitionPath = config.getBoolean(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY(), - Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL())); - this.hiveStylePartitioning = config.getBoolean(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), - Boolean.parseBoolean(DataSourceWriteOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL())); } /** - * Generate a record Key out of provided generic record. - */ - public abstract String getRecordKey(GenericRecord record); - - /** - * Generate a partition path out of provided generic record. - */ - public abstract String getPartitionPath(GenericRecord record); - - /** - * Generate a Hoodie Key out of provided generic record. + * Fetch record key from {@link Row}. + * @param row instance of {@link Row} from which record key is requested. + * @return the record key of interest from {@link Row}. */ - public final HoodieKey getKey(GenericRecord record) { - if (getRecordKeyFields() == null || getPartitionPathFields() == null) { - throw new HoodieKeyException("Unable to find field names for record key or partition path in cfg"); + @Override + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public String getRecordKey(Row row) { + if (null == converterFn) { + converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE); } - return new HoodieKey(getRecordKey(record), getPartitionPath(record)); + GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); + return getKey(genericRecord).getRecordKey(); } + /** + * Fetch partition path from {@link Row}. + * @param row instance of {@link Row} from which partition path is requested + * @return the partition path of interest from {@link Row}. + */ @Override - public final List getRecordKeyFieldNames() { - // For nested columns, pick top level column name - return getRecordKeyFields().stream().map(k -> { - int idx = k.indexOf('.'); - return idx > 0 ? k.substring(0, idx) : k; - }).collect(Collectors.toList()); + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public String getPartitionPath(Row row) { + if (null == converterFn) { + converterFn = AvroConversionHelper.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE); + } + GenericRecord genericRecord = (GenericRecord) converterFn.apply(row); + return getKey(genericRecord).getPartitionPath(); } void buildFieldPositionMapIfNeeded(StructType structType) { @@ -119,12 +116,5 @@ void buildFieldPositionMapIfNeeded(StructType structType) { this.structType = structType; } } - - public List getRecordKeyFields() { - return recordKeyFields; - } - - public List getPartitionPathFields() { - return partitionPathFields; - } } + diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java similarity index 61% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java index e679e99ad9444..36c8345593539 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/ComplexKeyGenerator.java @@ -1,13 +1,12 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,38 +17,38 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.config.TypedProperties; - import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.spark.sql.Row; import java.util.Arrays; import java.util.stream.Collectors; -import org.apache.spark.sql.Row; /** * Complex key generator, which takes names of fields to be used for recordKey and partitionPath as configs. */ public class ComplexKeyGenerator extends BuiltinKeyGenerator { - public static final String DEFAULT_RECORD_KEY_SEPARATOR = ":"; + private final ComplexAvroKeyGenerator complexAvroKeyGenerator; public ComplexKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()) + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY) .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); - this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()) + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY) .split(",")).map(String::trim).filter(s -> !s.isEmpty()).collect(Collectors.toList()); + complexAvroKeyGenerator = new ComplexAvroKeyGenerator(props); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + return complexAvroKeyGenerator.getRecordKey(record); } @Override public String getPartitionPath(GenericRecord record) { - return KeyGenUtils.getRecordPartitionPath(record, getPartitionPathFields(), hiveStylePartitioning, encodePartitionPath); + return complexAvroKeyGenerator.getPartitionPath(record); } @Override @@ -64,4 +63,5 @@ public String getPartitionPath(Row row) { return RowKeyGeneratorHelper.getPartitionPathFromRow(row, getPartitionPathFields(), hiveStylePartitioning, partitionPathPositions); } -} \ No newline at end of file + +} diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java similarity index 75% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index e4576882c5d64..6727b79d78477 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -18,13 +18,12 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieKeyException; - -import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import java.io.IOException; @@ -46,30 +45,36 @@ */ public class CustomKeyGenerator extends BuiltinKeyGenerator { - private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; - private static final String SPLIT_REGEX = ":"; - - /** - * Used as a part of config in CustomKeyGenerator.java. - */ - public enum PartitionKeyType { - SIMPLE, TIMESTAMP - } + private final CustomAvroKeyGenerator customAvroKeyGenerator; public CustomKeyGenerator(TypedProperties props) { super(props); - this.recordKeyFields = Arrays.stream(props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); - this.partitionPathFields = Arrays.stream(props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()).split(",")).map(String::trim).collect(Collectors.toList()); + this.recordKeyFields = Arrays.stream(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList()); + this.partitionPathFields = Arrays.stream(props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY).split(",")).map(String::trim).collect(Collectors.toList()); + customAvroKeyGenerator = new CustomAvroKeyGenerator(props); } @Override - public String getPartitionPath(Row row) { - return getPartitionPath(Option.empty(), Option.of(row)); + public String getRecordKey(GenericRecord record) { + return customAvroKeyGenerator.getRecordKey(record); } @Override public String getPartitionPath(GenericRecord record) { - return getPartitionPath(Option.of(record), Option.empty()); + return customAvroKeyGenerator.getPartitionPath(record); + } + + @Override + public String getRecordKey(Row row) { + validateRecordKeyFields(); + return getRecordKeyFields().size() == 1 + ? new SimpleKeyGenerator(config).getRecordKey(row) + : new ComplexKeyGenerator(config).getRecordKey(row); + } + + @Override + public String getPartitionPath(Row row) { + return getPartitionPath(Option.empty(), Option.of(row)); } private String getPartitionPath(Option record, Option row) { @@ -85,13 +90,13 @@ private String getPartitionPath(Option record, Option row) { return ""; } for (String field : getPartitionPathFields()) { - String[] fieldWithType = field.split(SPLIT_REGEX); + String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex()); if (fieldWithType.length != 2) { - throw new HoodieKeyException("Unable to find field names for partition path in proper format"); + throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format"); } partitionPathField = fieldWithType[0]; - PartitionKeyType keyType = PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()); + CustomAvroKeyGenerator.PartitionKeyType keyType = CustomAvroKeyGenerator.PartitionKeyType.valueOf(fieldWithType[1].toUpperCase()); switch (keyType) { case SIMPLE: if (record.isPresent()) { @@ -108,38 +113,23 @@ private String getPartitionPath(Option record, Option row) { partitionPath.append(new TimestampBasedKeyGenerator(config, partitionPathField).getPartitionPath(row.get())); } } catch (IOException ioe) { - throw new HoodieDeltaStreamerException("Unable to initialise TimestampBasedKeyGenerator class"); + throw new HoodieKeyGeneratorException("Unable to initialise TimestampBasedKeyGenerator class"); } break; default: - throw new HoodieDeltaStreamerException("Please provide valid PartitionKeyType with fields! You provided: " + keyType); + throw new HoodieKeyGeneratorException("Please provide valid PartitionKeyType with fields! You provided: " + keyType); } - partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + partitionPath.append(customAvroKeyGenerator.getDefaultPartitionPathSeparator()); } partitionPath.deleteCharAt(partitionPath.length() - 1); return partitionPath.toString(); } - @Override - public String getRecordKey(GenericRecord record) { - validateRecordKeyFields(); - return getRecordKeyFields().size() == 1 - ? new SimpleKeyGenerator(config).getRecordKey(record) - : new ComplexKeyGenerator(config).getRecordKey(record); - } - - @Override - public String getRecordKey(Row row) { - validateRecordKeyFields(); - return getRecordKeyFields().size() == 1 - ? new SimpleKeyGenerator(config).getRecordKey(row) - : new ComplexKeyGenerator(config).getRecordKey(row); - } - private void validateRecordKeyFields() { if (getRecordKeyFields() == null || getRecordKeyFields().isEmpty()) { throw new HoodieKeyException("Unable to find field names for record key in cfg"); } } } + diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java similarity index 78% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java index 243493b45291a..5c9a813a2c314 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/GlobalDeleteKeyGenerator.java @@ -18,10 +18,9 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.config.TypedProperties; - import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import java.util.ArrayList; @@ -34,21 +33,21 @@ */ public class GlobalDeleteKeyGenerator extends BuiltinKeyGenerator { - private static final String EMPTY_PARTITION = ""; - + private final GlobalAvroDeleteKeyGenerator globalAvroDeleteKeyGenerator; public GlobalDeleteKeyGenerator(TypedProperties config) { super(config); - this.recordKeyFields = Arrays.asList(config.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()).split(",")); + this.recordKeyFields = Arrays.asList(config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY).split(",")); + globalAvroDeleteKeyGenerator = new GlobalAvroDeleteKeyGenerator(config); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields()); + return globalAvroDeleteKeyGenerator.getRecordKey(record); } @Override public String getPartitionPath(GenericRecord record) { - return EMPTY_PARTITION; + return globalAvroDeleteKeyGenerator.getPartitionPath(record); } @Override @@ -64,6 +63,7 @@ public String getRecordKey(Row row) { @Override public String getPartitionPath(Row row) { - return EMPTY_PARTITION; + return globalAvroDeleteKeyGenerator.getEmptyPartition(); } } + diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java similarity index 75% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java index db51024017d12..543e1349e9f7c 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/NonpartitionedKeyGenerator.java @@ -18,12 +18,10 @@ package org.apache.hudi.keygen; -import org.apache.hudi.common.config.TypedProperties; - import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; import org.apache.spark.sql.Row; -import java.util.ArrayList; import java.util.List; /** @@ -31,25 +29,27 @@ */ public class NonpartitionedKeyGenerator extends SimpleKeyGenerator { - private static final String EMPTY_PARTITION = ""; - private static final List EMPTY_PARTITION_FIELD_LIST = new ArrayList<>(); + private final NonpartitionedAvroKeyGenerator nonpartitionedAvroKeyGenerator; - public NonpartitionedKeyGenerator(TypedProperties props) { - super(props); + public NonpartitionedKeyGenerator(TypedProperties config) { + super(config); + nonpartitionedAvroKeyGenerator = new NonpartitionedAvroKeyGenerator(config); } @Override public String getPartitionPath(GenericRecord record) { - return EMPTY_PARTITION; + return nonpartitionedAvroKeyGenerator.getPartitionPath(record); } @Override public List getPartitionPathFields() { - return EMPTY_PARTITION_FIELD_LIST; + return nonpartitionedAvroKeyGenerator.getPartitionPathFields(); } @Override public String getPartitionPath(Row row) { - return EMPTY_PARTITION; + return nonpartitionedAvroKeyGenerator.getEmptyPartition(); } + } + diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java similarity index 99% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java index 4c05489ce840b..dd0d4c5c5318b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/RowKeyGeneratorHelper.java @@ -19,10 +19,10 @@ package org.apache.hudi.keygen; import org.apache.hudi.exception.HoodieKeyException; - import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import scala.Option; import java.util.ArrayList; import java.util.Arrays; @@ -33,8 +33,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import scala.Option; - import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH; import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR; import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER; diff --git a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java similarity index 82% rename from hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index c2b8b12f93cad..332686d378c0b 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -18,10 +18,9 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.config.TypedProperties; - import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import java.util.Collections; @@ -31,9 +30,11 @@ */ public class SimpleKeyGenerator extends BuiltinKeyGenerator { + private final SimpleAvroKeyGenerator simpleAvroKeyGenerator; + public SimpleKeyGenerator(TypedProperties props) { - this(props, props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()), - props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY())); + this(props, props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY), + props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)); } SimpleKeyGenerator(TypedProperties props, String partitionPathField) { @@ -46,16 +47,17 @@ public SimpleKeyGenerator(TypedProperties props) { ? Collections.emptyList() : Collections.singletonList(recordKeyField); this.partitionPathFields = Collections.singletonList(partitionPathField); + simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField); } @Override public String getRecordKey(GenericRecord record) { - return KeyGenUtils.getRecordKey(record, getRecordKeyFields().get(0)); + return simpleAvroKeyGenerator.getRecordKey(record); } @Override public String getPartitionPath(GenericRecord record) { - return KeyGenUtils.getPartitionPath(record, getPartitionPathFields().get(0), hiveStylePartitioning, encodePartitionPath); + return simpleAvroKeyGenerator.getPartitionPath(record); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java new file mode 100644 index 0000000000000..859269c751a80 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.spark.sql.Row; + +import java.io.IOException; + +import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH; +import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER; +import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER; + +/** + * Key generator, that relies on timestamps for partitioning field. Still picks record key by name. + */ +public class TimestampBasedKeyGenerator extends SimpleKeyGenerator { + + private final TimestampBasedAvroKeyGenerator timestampBasedAvroKeyGenerator; + + public TimestampBasedKeyGenerator(TypedProperties config) throws IOException { + this(config, config.getString(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY), + config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY)); + } + + TimestampBasedKeyGenerator(TypedProperties config, String partitionPathField) throws IOException { + this(config, null, partitionPathField); + } + + TimestampBasedKeyGenerator(TypedProperties config, String recordKeyField, String partitionPathField) throws IOException { + super(config, recordKeyField, partitionPathField); + timestampBasedAvroKeyGenerator = new TimestampBasedAvroKeyGenerator(config, recordKeyField, partitionPathField); + } + + @Override + public String getPartitionPath(GenericRecord record) { + return timestampBasedAvroKeyGenerator.getPartitionPath(record); + } + + @Override + public String getRecordKey(Row row) { + buildFieldPositionMapIfNeeded(row.schema()); + return RowKeyGeneratorHelper.getRecordKeyFromRow(row, getRecordKeyFields(), recordKeyPositions, false); + } + + @Override + public String getPartitionPath(Row row) { + Object fieldVal = null; + buildFieldPositionMapIfNeeded(row.schema()); + Object partitionPathFieldVal = RowKeyGeneratorHelper.getNestedFieldVal(row, partitionPathPositions.get(getPartitionPathFields().get(0))); + try { + if (partitionPathFieldVal == null || partitionPathFieldVal.toString().contains(DEFAULT_PARTITION_PATH) || partitionPathFieldVal.toString().contains(NULL_RECORDKEY_PLACEHOLDER) + || partitionPathFieldVal.toString().contains(EMPTY_RECORDKEY_PLACEHOLDER)) { + fieldVal = timestampBasedAvroKeyGenerator.getDefaultPartitionVal(); + } else { + fieldVal = partitionPathFieldVal; + } + return timestampBasedAvroKeyGenerator.getPartitionPath(fieldVal); + } catch (Exception e) { + throw new HoodieKeyGeneratorException("Unable to parse input partition field :" + fieldVal, e); + } + } +} \ No newline at end of file diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala similarity index 96% rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index c701e708bcd02..db1ca6f94c3f6 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,10 +24,10 @@ import java.util import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} -import org.apache.avro.{LogicalTypes, Schema} import org.apache.avro.Schema.Type._ import org.apache.avro.generic.GenericData.{Fixed, Record} import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord} +import org.apache.avro.{LogicalTypes, Schema} import org.apache.spark.sql.Row import org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters} import org.apache.spark.sql.catalyst.expressions.GenericRow diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala similarity index 88% rename from hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 70a135624ffd3..d1a4249f7962a 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +18,14 @@ package org.apache.hudi -import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} -import org.apache.hudi.common.model.HoodieKey import org.apache.avro.Schema +import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.model.HoodieKey import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import scala.collection.JavaConverters._ diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java new file mode 100644 index 0000000000000..54f4ffaef9dd2 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.exception.HoodieKeyException; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.testutils.KeyGeneratorTestUtilities; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static junit.framework.TestCase.assertEquals; + +public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities { + + private TypedProperties getCommonProps(boolean getComplexRecordKey) { + TypedProperties properties = new TypedProperties(); + if (getComplexRecordKey) { + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col"); + } else { + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + } + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); + return properties; + } + + private TypedProperties getPropertiesWithoutPartitionPathProp() { + return getCommonProps(false); + } + + private TypedProperties getPropertiesWithoutRecordKeyProp() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + return properties; + } + + private TypedProperties getWrongRecordKeyFieldProps() { + TypedProperties properties = new TypedProperties(); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key"); + return properties; + } + + private TypedProperties getProps() { + TypedProperties properties = getCommonProps(true); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms"); + return properties; + } + + @Test + public void testNullPartitionPathFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp())); + } + + @Test + public void testNullRecordKeyFields() { + Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp())); + } + + @Test + public void testWrongRecordKeyField() { + ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps()); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); + Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); + } + + @Test + public void testHappyFlow() { + ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps()); + GenericRecord record = getRecord(); + HoodieKey key = keyGenerator.getKey(record); + Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21"); + Row row = KeyGeneratorTestUtilities.getRow(record); + Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi"); + Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21"); + } + + @Test + public void testSingleValueKeyGenerator() { + TypedProperties properties = new TypedProperties(); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); + assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1); + assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); + String rowKey = record.get("_row_key").toString(); + String partitionPath = record.get("timestamp").toString(); + HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); + assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey()); + assertEquals(partitionPath, hoodieKey.getPartitionPath()); + } + + @Test + public void testMultipleValueKeyGenerator() { + TypedProperties properties = new TypedProperties(); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "rider,driver"); + ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); + assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2); + assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); + String rowKey = + "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + "," + + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString(); + String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString(); + HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); + assertEquals(rowKey, hoodieKey.getRecordKey()); + assertEquals(partitionPath, hoodieKey.getPartitionPath()); + } + + @Test + public void testMultipleValueKeyGeneratorNonPartitioned() { + TypedProperties properties = new TypedProperties(); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,timestamp"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, ""); + ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); + assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2); + assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); + String rowKey = + "_row_key" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + "," + + "timestamp" + ComplexAvroKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString(); + String partitionPath = ""; + HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); + assertEquals(rowKey, hoodieKey.getRecordKey()); + assertEquals(partitionPath, hoodieKey.getPartitionPath()); + } +} diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java similarity index 77% rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java index add2547e7e4c8..dc30b932e9d2b 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestCustomKeyGenerator.java @@ -18,11 +18,11 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.config.TypedProperties; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.testutils.KeyGeneratorTestUtilities; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; @@ -33,48 +33,48 @@ public class TestCustomKeyGenerator extends KeyGeneratorTestUtilities { private TypedProperties getCommonProps(boolean getComplexRecordKey) { TypedProperties properties = new TypedProperties(); if (getComplexRecordKey) { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key, pii_col"); } else { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); } - properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); return properties; } private TypedProperties getPropertiesForSimpleKeyGen() { TypedProperties properties = getCommonProps(false); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); return properties; } private TypedProperties getImproperPartitionFieldFormatProp() { TypedProperties properties = getCommonProps(false); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); return properties; } private TypedProperties getInvalidPartitionKeyTypeProps() { TypedProperties properties = getCommonProps(false); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:dummy"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:dummy"); return properties; } private TypedProperties getComplexRecordKeyWithSimplePartitionProps() { TypedProperties properties = getCommonProps(true); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); return properties; } private TypedProperties getComplexRecordKeyAndPartitionPathProps() { TypedProperties properties = getCommonProps(true); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple,ts_ms:timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple,ts_ms:timestamp"); populateNecessaryPropsForTimestampBasedKeyGen(properties); return properties; } private TypedProperties getPropsWithoutRecordKeyFieldProps() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp:simple"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp:simple"); return properties; } @@ -86,20 +86,20 @@ private void populateNecessaryPropsForTimestampBasedKeyGen(TypedProperties prope private TypedProperties getPropertiesForTimestampBasedKeyGen() { TypedProperties properties = getCommonProps(false); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "ts_ms:timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "ts_ms:timestamp"); populateNecessaryPropsForTimestampBasedKeyGen(properties); return properties; } private TypedProperties getPropertiesForNonPartitionedKeyGen() { TypedProperties properties = getCommonProps(false); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), ""); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, ""); return properties; } @Test public void testSimpleKeyGenerator() { - KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForSimpleKeyGen()); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -111,7 +111,7 @@ public void testSimpleKeyGenerator() { @Test public void testTimestampBasedKeyGenerator() { - KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForTimestampBasedKeyGen()); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -123,7 +123,7 @@ public void testTimestampBasedKeyGenerator() { @Test public void testNonPartitionedKeyGenerator() { - KeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropertiesForNonPartitionedKeyGen()); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "key1"); @@ -136,28 +136,28 @@ public void testNonPartitionedKeyGenerator() { @Test public void testInvalidPartitionKeyType() { try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); keyGenerator.getKey(getRecord()); Assertions.fail("should fail when invalid PartitionKeyType is provided!"); } catch (Exception e) { - Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY")); + Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")); } try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getInvalidPartitionKeyTypeProps()); GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getPartitionPath(row); Assertions.fail("should fail when invalid PartitionKeyType is provided!"); } catch (Exception e) { - Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomKeyGenerator.PartitionKeyType.DUMMY")); + Assertions.assertTrue(e.getMessage().contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY")); } } @Test public void testNoRecordKeyFieldProp() { try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); keyGenerator.getKey(getRecord()); Assertions.fail("should fail when record key field is not provided!"); } catch (Exception e) { @@ -165,7 +165,7 @@ public void testNoRecordKeyFieldProp() { } try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getPropsWithoutRecordKeyFieldProps()); GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getRecordKey(row); @@ -178,7 +178,7 @@ public void testNoRecordKeyFieldProp() { @Test public void testPartitionFieldsInImproperFormat() { try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); keyGenerator.getKey(getRecord()); Assertions.fail("should fail when partition key field is provided in improper format!"); } catch (Exception e) { @@ -186,7 +186,7 @@ public void testPartitionFieldsInImproperFormat() { } try { - KeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getImproperPartitionFieldFormatProp()); GenericRecord record = getRecord(); Row row = KeyGeneratorTestUtilities.getRow(record); keyGenerator.getPartitionPath(row); @@ -198,7 +198,7 @@ public void testPartitionFieldsInImproperFormat() { @Test public void testComplexRecordKeyWithSimplePartitionPath() { - KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyWithSimplePartitionProps()); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); @@ -211,7 +211,7 @@ public void testComplexRecordKeyWithSimplePartitionPath() { @Test public void testComplexRecordKeysWithComplexPartitionPath() { - KeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps()); + BuiltinKeyGenerator keyGenerator = new CustomKeyGenerator(getComplexRecordKeyAndPartitionPathProps()); GenericRecord record = getRecord(); HoodieKey key = keyGenerator.getKey(record); Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java similarity index 84% rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java index 96d607af8cae1..078101b4a6317 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestGlobalDeleteKeyGenerator.java @@ -18,12 +18,12 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.exception.HoodieKeyException; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.testutils.KeyGeneratorTestUtilities; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; @@ -34,29 +34,29 @@ public class TestGlobalDeleteKeyGenerator extends KeyGeneratorTestUtilities { private TypedProperties getCommonProps(boolean getComplexRecordKey) { TypedProperties properties = new TypedProperties(); if (getComplexRecordKey) { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col"); } else { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); } - properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); return properties; } private TypedProperties getPropertiesWithoutRecordKeyProp() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); return properties; } private TypedProperties getWrongRecordKeyFieldProps() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key"); return properties; } private TypedProperties getProps() { TypedProperties properties = getCommonProps(true); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp,ts_ms"); return properties; } diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java similarity index 82% rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java index 4eb184e26f499..80b85d8ee1046 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestSimpleKeyGenerator.java @@ -18,12 +18,12 @@ package org.apache.hudi.keygen; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.exception.HoodieKeyException; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.testutils.KeyGeneratorTestUtilities; import org.apache.spark.sql.Row; import org.junit.jupiter.api.Assertions; @@ -33,8 +33,8 @@ public class TestSimpleKeyGenerator extends KeyGeneratorTestUtilities { private TypedProperties getCommonProps() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); - properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); + properties.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true"); return properties; } @@ -44,34 +44,34 @@ private TypedProperties getPropertiesWithoutPartitionPathProp() { private TypedProperties getPropertiesWithoutRecordKeyProp() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); return properties; } private TypedProperties getWrongRecordKeyFieldProps() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_wrong_key"); return properties; } private TypedProperties getWrongPartitionPathFieldProps() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "_wrong_partition_path"); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "_wrong_partition_path"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key"); return properties; } private TypedProperties getComplexRecordKeyProp() { TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,pii_col"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); + properties.put(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key,pii_col"); return properties; } private TypedProperties getProps() { TypedProperties properties = getCommonProps(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); + properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "timestamp"); return properties; } diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java similarity index 84% rename from hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java index 78674153e3783..98a8f67d6119c 100644 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/TestTimestampBasedKeyGenerator.java @@ -20,14 +20,14 @@ import org.apache.hudi.AvroConversionHelper; import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.testutils.SchemaTestUtil; -import org.apache.hudi.exception.HoodieDeltaStreamerException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.exception.HoodieKeyGeneratorException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.apache.spark.sql.types.StructType; @@ -58,15 +58,15 @@ public void initialize() throws IOException { .generateAvroRecordFromJson(schema, 1, "001", "f1"); baseRow = genericRecordToRow(baseRecord); - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1"); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "createTime"); - properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "false"); + properties.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY, "field1"); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY, "createTime"); + properties.setProperty(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "false"); } private TypedProperties getBaseKeyConfig(String timestampType, String dateFormat, String timezone, String scalarType) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat); - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, dateFormat); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TIMEZONE_FORMAT_PROP, timezone); if (scalarType != null) { properties.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit", scalarType); @@ -88,22 +88,22 @@ private Row genericRecordToRow(GenericRecord baseRecord) { private TypedProperties getBaseKeyConfig(String timestampType, String inputFormatList, String inputFormatDelimiterRegex, String inputTimezone, String outputFormat, String outputTimezone) { if (timestampType != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_TYPE_FIELD_PROP, timestampType); } if (inputFormatList != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, inputFormatList); } if (inputFormatDelimiterRegex != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_DATE_FORMAT_LIST_DELIMITER_REGEX_PROP, inputFormatDelimiterRegex); } if (inputTimezone != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_INPUT_TIMEZONE_FORMAT_PROP, inputTimezone); } if (outputFormat != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputFormat); } if (outputTimezone != null) { - properties.setProperty(TimestampBasedKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone); + properties.setProperty(TimestampBasedAvroKeyGenerator.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, outputTimezone); } return properties; } @@ -213,7 +213,7 @@ public void test_ExpectsMatch_SingleInputFormat_ISO8601WithMsZ_OutputTimezoneAsU "", "yyyyMMddHH", "GMT"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040113", hk1.getPartitionPath()); @@ -231,7 +231,7 @@ public void test_ExpectsMatch_SingleInputFormats_ISO8601WithMsZ_OutputTimezoneAs "", "yyyyMMddHH", ""); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040113", hk1.getPartitionPath()); @@ -249,7 +249,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsZ_OutputTimezone "", "yyyyMMddHH", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040113", hk1.getPartitionPath()); @@ -267,7 +267,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ISO8601NoMsZ_OutputTimezoneAs "", "yyyyMMddHH", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040113", hk1.getPartitionPath()); @@ -285,7 +285,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ISO8601NoMsWithOffset_OutputT "", "yyyyMMddHH", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040118", hk1.getPartitionPath()); @@ -303,7 +303,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsWithOffset_Outpu "", "yyyyMMddHH", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040118", hk1.getPartitionPath()); @@ -321,7 +321,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ISO8601WithMsZ_OutputTimezone "", "yyyyMMddHH", "EST"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("2020040109", hk1.getPartitionPath()); @@ -339,11 +339,11 @@ public void test_Throws_MultipleInputFormats_InputDateNotMatchingFormats() throw "", "yyyyMMddHH", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); - Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getKey(baseRecord)); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getKey(baseRecord)); baseRow = genericRecordToRow(baseRecord); - Assertions.assertThrows(HoodieDeltaStreamerException.class, () -> keyGen.getPartitionPath(baseRow)); + Assertions.assertThrows(HoodieKeyGeneratorException.class, () -> keyGen.getPartitionPath(baseRow)); } @Test @@ -356,7 +356,7 @@ public void test_ExpectsMatch_MultipleInputFormats_ShortDate_OutputCustomDate() "UTC", "MM/dd/yyyy", "UTC"); - KeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); + BuiltinKeyGenerator keyGen = new TimestampBasedKeyGenerator(properties); HoodieKey hk1 = keyGen.getKey(baseRecord); Assertions.assertEquals("04/01/2020", hk1.getPartitionPath()); diff --git a/hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java similarity index 100% rename from hudi-spark/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java rename to hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/KeyGeneratorTestUtilities.java diff --git a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index b3ed7ae5476c6..c820ebef43a5a 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.keygen.KeyGenerator; +import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -72,7 +72,7 @@ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlConte TypedProperties properties = new TypedProperties(); properties.putAll(config.getProps()); String keyGeneratorClass = properties.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()); - KeyGenerator keyGenerator = (KeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties); + BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties); StructType structTypeForUDF = rows.schema(); sqlContext.udf().register(RECORD_KEY_UDF_FN, (UDF1) keyGenerator::getRecordKey, DataTypes.StringType); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index ce940f79104ee..fc52b3851cd1b 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.WriteOperationType import org.apache.hudi.hive.HiveSyncTool import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor import org.apache.hudi.keygen.SimpleKeyGenerator +import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.log4j.LogManager /** @@ -213,14 +214,14 @@ object DataSourceWriteOptions { * the dot notation eg: `a.b.c` * */ - val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field" + val RECORDKEY_FIELD_OPT_KEY = KeyGeneratorOptions.RECORDKEY_FIELD_OPT_KEY val DEFAULT_RECORDKEY_FIELD_OPT_VAL = "uuid" /** * Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. Actual * value ontained by invoking .toString() */ - val PARTITIONPATH_FIELD_OPT_KEY = "hoodie.datasource.write.partitionpath.field" + val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_OPT_KEY val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = "partitionpath" /** @@ -228,10 +229,10 @@ object DataSourceWriteOptions { * If set true, the names of partition folders follow = format. * By default false (the names of partition folders are only partition values) */ - val HIVE_STYLE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.hive_style_partitioning" - val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = "false" - val URL_ENCODE_PARTITIONING_OPT_KEY = "hoodie.datasource.write.partitionpath.urlencode" - val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = "false" + val HIVE_STYLE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_OPT_KEY + val DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL + val URL_ENCODE_PARTITIONING_OPT_KEY = KeyGeneratorOptions.URL_ENCODE_PARTITIONING_OPT_KEY + val DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL = KeyGeneratorOptions.DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL /** * Key generator class, that implements will extract the key out of incoming record * diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 32d6a09452c3f..148102480c3c5 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,7 +23,6 @@ import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS - import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration diff --git a/hudi-spark/src/test/java/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/TestComplexKeyGenerator.java deleted file mode 100644 index a5a88c2d7dba5..0000000000000 --- a/hudi-spark/src/test/java/TestComplexKeyGenerator.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import static junit.framework.TestCase.assertEquals; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.keygen.ComplexKeyGenerator; -import org.junit.jupiter.api.Test; - -public class TestComplexKeyGenerator { - - @Test - public void testSingleValueKeyGenerator() { - TypedProperties properties = new TypedProperties(); - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); - ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); - assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 1); - assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 1); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); - String rowKey = record.get("_row_key").toString(); - String partitionPath = record.get("timestamp").toString(); - HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); - assertEquals("_row_key:" + rowKey, hoodieKey.getRecordKey()); - assertEquals(partitionPath, hoodieKey.getPartitionPath()); - } - - @Test - public void testMultipleValueKeyGenerator() { - TypedProperties properties = new TypedProperties(); - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp"); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "rider,driver"); - ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); - assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2); - assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 2); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); - String rowKey = - "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + "," - + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString(); - String partitionPath = record.get("rider").toString() + "/" + record.get("driver").toString(); - HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); - assertEquals(rowKey, hoodieKey.getRecordKey()); - assertEquals(partitionPath, hoodieKey.getPartitionPath()); - } - - @Test - public void testMultipleValueKeyGeneratorNonPartitioned() { - TypedProperties properties = new TypedProperties(); - properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key,timestamp"); - properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), ""); - ComplexKeyGenerator compositeKeyGenerator = new ComplexKeyGenerator(properties); - assertEquals(compositeKeyGenerator.getRecordKeyFields().size(), 2); - assertEquals(compositeKeyGenerator.getPartitionPathFields().size(), 0); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - GenericRecord record = dataGenerator.generateGenericRecords(1).get(0); - String rowKey = - "_row_key" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("_row_key").toString() + "," - + "timestamp" + ComplexKeyGenerator.DEFAULT_RECORD_KEY_SEPARATOR + record.get("timestamp").toString(); - String partitionPath = ""; - HoodieKey hoodieKey = compositeKeyGenerator.getKey(record); - assertEquals(rowKey, hoodieKey.getRecordKey()); - assertEquals(partitionPath, hoodieKey.getPartitionPath()); - } - -} \ No newline at end of file diff --git a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java b/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java deleted file mode 100644 index 4c5ded3089a72..0000000000000 --- a/hudi-spark/src/test/java/org/apache/hudi/keygen/TestComplexKeyGenerator.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.keygen; - -import org.apache.hudi.DataSourceWriteOptions; -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.exception.HoodieKeyException; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hudi.testutils.KeyGeneratorTestUtilities; -import org.apache.spark.sql.Row; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class TestComplexKeyGenerator extends KeyGeneratorTestUtilities { - - private TypedProperties getCommonProps(boolean getComplexRecordKey) { - TypedProperties properties = new TypedProperties(); - if (getComplexRecordKey) { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key, pii_col"); - } else { - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key"); - } - properties.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true"); - return properties; - } - - private TypedProperties getPropertiesWithoutPartitionPathProp() { - return getCommonProps(false); - } - - private TypedProperties getPropertiesWithoutRecordKeyProp() { - TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); - return properties; - } - - private TypedProperties getWrongRecordKeyFieldProps() { - TypedProperties properties = new TypedProperties(); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp"); - properties.put(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_wrong_key"); - return properties; - } - - private TypedProperties getProps() { - TypedProperties properties = getCommonProps(true); - properties.put(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "timestamp,ts_ms"); - return properties; - } - - @Test - public void testNullPartitionPathFields() { - Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutPartitionPathProp())); - } - - @Test - public void testNullRecordKeyFields() { - Assertions.assertThrows(IllegalArgumentException.class, () -> new ComplexKeyGenerator(getPropertiesWithoutRecordKeyProp())); - } - - @Test - public void testWrongRecordKeyField() { - ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getWrongRecordKeyFieldProps()); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.getRecordKey(getRecord())); - Assertions.assertThrows(HoodieKeyException.class, () -> keyGenerator.buildFieldPositionMapIfNeeded(KeyGeneratorTestUtilities.structType)); - } - - @Test - public void testHappyFlow() { - ComplexKeyGenerator keyGenerator = new ComplexKeyGenerator(getProps()); - GenericRecord record = getRecord(); - HoodieKey key = keyGenerator.getKey(record); - Assertions.assertEquals(key.getRecordKey(), "_row_key:key1,pii_col:pi"); - Assertions.assertEquals(key.getPartitionPath(), "timestamp=4357686/ts_ms=2020-03-21"); - Row row = KeyGeneratorTestUtilities.getRow(record); - Assertions.assertEquals(keyGenerator.getRecordKey(row), "_row_key:key1,pii_col:pi"); - Assertions.assertEquals(keyGenerator.getPartitionPath(row), "timestamp=4357686/ts_ms=2020-03-21"); - } -} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala index 902359da7e400..e29944529b51b 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionHelper.scala @@ -22,7 +22,6 @@ import java.time.LocalDate import org.apache.avro.Schema import org.apache.avro.generic.GenericData -import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.{FunSuite, Matchers} @@ -43,7 +42,7 @@ class TestAvroConversionHelper extends FunSuite with Matchers { test("Logical type: date") { val schema = new Schema.Parser().parse(dateSchema) - val convertor = AvroConversionHelper.createConverterToRow(schema, convertAvroSchemaToStructType(schema)) + val convertor = AvroConversionHelper.createConverterToRow(schema, AvroConversionUtils.convertAvroSchemaToStructType(schema)) val dateOutputData = dateInputData.map(x => { val record = new GenericData.Record(schema) {{ put("date", x) }} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala index 73e1f5df63dea..99e1297f6ae47 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala @@ -17,8 +17,6 @@ package org.apache.hudi -import java.util - import org.apache.avro.generic.GenericRecord import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.TypedProperties @@ -236,14 +234,29 @@ class TestDataSourceDefaults { assertEquals("name1", keyGen.getPartitionPath(baseRow)) } - class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) { + class UserDefinedKeyGenerator(props: TypedProperties) extends KeyGenerator(props) with SparkKeyGeneratorInterface { val recordKeyProp: String = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY) val partitionPathProp: String = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY) + val STRUCT_NAME: String = "hoodieRowTopLevelField" + val NAMESPACE: String = "hoodieRow" + var converterFn: Function1[Any, Any] = _ override def getKey(record: GenericRecord): HoodieKey = { new HoodieKey(HoodieAvroUtils.getNestedFieldValAsString(record, recordKeyProp, true), HoodieAvroUtils.getNestedFieldValAsString(record, partitionPathProp, true)) } + + override def getRecordKey(row: Row): String = { + if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE) + val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord] + getKey(genericRecord).getRecordKey + } + + override def getPartitionPath(row: Row): String = { + if (null == converterFn) converterFn = AvroConversionHelper.createConverterToAvro(row.schema, STRUCT_NAME, NAMESPACE) + val genericRecord = converterFn.apply(row).asInstanceOf[GenericRecord] + getKey(genericRecord).getPartitionPath + } } @Test def testComplexKeyGenerator() = {