Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.avro;

import java.util.Arrays;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
Expand All @@ -27,6 +28,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
Expand Down Expand Up @@ -79,7 +81,6 @@
import java.util.TimeZone;
import java.util.stream.Collectors;

import static org.apache.avro.Schema.Type.UNION;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
Expand Down Expand Up @@ -108,6 +109,8 @@ public class HoodieAvroUtils {

public static final Schema RECORD_KEY_SCHEMA = initRecordKeySchema();

private static final String FIELD_LOCATION_DELIMITER = "\\.";

/**
* Convert a given avro record to bytes.
*/
Expand Down Expand Up @@ -1033,7 +1036,7 @@ public static int fromJavaDate(Date date) {

private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
Schema actualSchema;
if (!schema.getType().equals(UNION)) {
if (!schema.getType().equals(Schema.Type.UNION)) {
return schema;
}
if (schema.getTypes().size() == 2
Expand Down Expand Up @@ -1064,4 +1067,62 @@ public static boolean gteqAvro1_9() {
public static boolean gteqAvro1_10() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
}

/**
* Given an Avro schema, this method will return the field specified by the path parameter.
* The fieldLocation parameter is an ordered string specifying the location of the nested field to retrieve.
* For example, field1.nestedField1 takes field "field1", and retrieves "nestedField1" from it.
* @param schema is the record to retrieve the schema from
* @param fieldLocation is the location of the field
* @return the field
*/
public static Option<Field> getField(Schema schema, String fieldLocation) {
if (schema == null || fieldLocation == null || fieldLocation.isEmpty()) {
return Option.empty();
}

List<String> pathList = Arrays.asList(fieldLocation.split(FIELD_LOCATION_DELIMITER));
pathList.stream()
.filter(pl -> pl.trim().isEmpty())
.findAny()
.ifPresent(f -> {
throw new HoodieValidationException("Invalid fieldLocation: " + fieldLocation);
});
if (pathList.size() == 0) {
return Option.empty();
}

return getFieldHelper(schema, pathList, 0);
}

/**
* Helper method that does the actual work for {@link #getField(Schema, String)} by recursively finding the required field.
*
* @param schema top level schema to be evaluated on
* @param pathList field to find, must be built in traversal order, from parent to child.
* @param field keeps track of the index used to access the list pathList
* @return the field
*/
private static Option<Field> getFieldHelper(Schema schema, List<String> pathList, int field) {
Field curField = schema.getField(pathList.get(field));
Schema fieldSchema = curField.schema();

if (pathList.size() == field + 1 && curField.name().equals(pathList.get(field))) {
return Option.of(curField);
}

switch (fieldSchema.getType()) {
case UNION:
// assume UNION is strictly nullable
return getFieldHelper(resolveNullableSchema(fieldSchema), pathList, ++field);
case MAP:
return getFieldHelper(fieldSchema.getValueType(), pathList, ++field);
case RECORD:
return getFieldHelper(fieldSchema, pathList, ++field);
case ARRAY:
return getFieldHelper(fieldSchema.getElementType(), pathList, ++field);
default:
return Option.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

package org.apache.hudi.avro;

import org.apache.avro.Schema.Field;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
Expand All @@ -43,6 +46,7 @@

import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -426,4 +430,30 @@ public void testConvertDaysToDate() {
int days = HoodieAvroUtils.fromJavaDate(now);
assertEquals(now.toLocalDate(), HoodieAvroUtils.toJavaDate(days).toLocalDate());
}

@Test
public void testGetField() {
Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD);

// empty schema should return empty option
Option<Field> nullSchemaTest = HoodieAvroUtils.getField(null, "nestedField");
assertFalse(nullSchemaTest.isPresent());

// null fieldLocation should return empty option
Option<Field> nullFieldLocationTest = HoodieAvroUtils.getField(nestedSchema, null);
assertFalse(nullFieldLocationTest.isPresent());

// empty fieldLocation should return empty option
Option<Field> emptyFieldLocationTest = HoodieAvroUtils.getField(nestedSchema, "");
assertFalse(emptyFieldLocationTest.isPresent());

// invalid fieldLocation should throw error
assertThrows(HoodieValidationException.class, () -> HoodieAvroUtils.getField(nestedSchema, ".firstname"));

Option<Field> topLevelFieldTest = HoodieAvroUtils.getField(nestedSchema, "firstname");
assertTrue(topLevelFieldTest.isPresent());

Option<Field> nestedFieldTest = HoodieAvroUtils.getField(nestedSchema, "student.lastname");
assertTrue(nestedFieldTest.isPresent());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
Expand Down Expand Up @@ -120,6 +122,7 @@ public Set<ConfigOption<?>> optionalOptions() {
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
List<String> fields = schema.getColumnNames();
Schema inferredSchema = AvroSchemaConverter.convertToSchema(schema.toPhysicalRowDataType().notNull().getLogicalType());

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to convert the ResolvedSchema as an avro schema for validation, the ResolvedSchema#getColumnDataTypes can fetch the data type of each field.

Also we need to fix the RowDataKeyGen#getRecordKey for nested primary keys.

Copy link
Member Author

@voonhous voonhous Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will take a look.

The main reasons for doing this is:

  1. AvroSchemaConverter.convertToSchema was already imported and used somewhere else in the code so, just reuse
  2. Convert it to an AvroSchema so that the helper functions can be written in HoodieAvroUtils, where the validation for creation of tables using the Spark as entrypoint can be reused.

Let me try to see if we can use the ResolvedSchema instead, will get back to you.

Also we need to fix the RowDataKeyGen#getRecordKey for nested primary keys.

Got it!

// validate record key in pk absence.
if (!schema.getPrimaryKey().isPresent()) {
Expand All @@ -132,7 +135,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {
}

Arrays.stream(recordKeys)
.filter(field -> !fields.contains(field))
.filter(field -> !HoodieAvroUtils.getField(inferredSchema, field).isPresent())
.findAny()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support nested primary key also in this patch ?

Copy link
Member Author

@voonhous voonhous Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, from what i understand, Spark-SQL supports nested primaryKey and precombineField.

The changes I made here is to standardize the validations in Spark and Flink.
https://issues.apache.org/jira/browse/HUDI-4051
https://github.com/apache/hudi/pull/5517/files

.ifPresent(f -> {
throw new HoodieValidationException("Field '" + f + "' specified in option "
Expand All @@ -142,7 +145,7 @@ private void sanityCheck(Configuration conf, ResolvedSchema schema) {

// validate pre_combine key
String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
if (!fields.contains(preCombineField)) {
if (!HoodieAvroUtils.getField(inferredSchema, preCombineField).isPresent()) {
if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may also need to validate the nested field names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This change is to standardize the validations between Flink and Spark. As such, no checks on the nested field names were made.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danny0405 I have added the feature to validate nested fields for Flink.

Can you please help to review this PR again?

Thank you.

throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key()
+ "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ void testRequiredOptionsForSource() {

assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext6));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6));

// nested pk field is allowed
ResolvedSchema schema6 = SchemaBuilder.instance()
.field("f0",
DataTypes.ROW(DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("date", DataTypes.VARCHAR(20))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write a IT test in ITTestHoodieDataSource.

Copy link
Member Author

@voonhous voonhous Oct 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, do you mean add IT tests?

Or remove the UT i included here and rewrite them as IT tests?

.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3))
.build();
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0.id");
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
final MockContext sourceContext7 = MockContext.getInstance(this.conf, schema6, "f2");

assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext7));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext7));

// nested precombine field is allowed
ResolvedSchema schema7 = SchemaBuilder.instance()
.field("f0", DataTypes.INT().notNull())
.field("f1", DataTypes.VARCHAR(20))
.field("f2", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.ROW(DataTypes.FIELD("year", DataTypes.INT()), DataTypes.FIELD("MONTH", DataTypes.INT())))
.build();
this.conf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0");
this.conf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2.year");
final MockContext sourceContext8 = MockContext.getInstance(this.conf, schema7, "f2");

assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSource(sourceContext8));
assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext8));
}

@Test
Expand Down