diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index 70cc9360a3374..39aa45c9344cf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -20,6 +20,9 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; @@ -28,6 +31,7 @@ import java.io.IOException; import java.util.Objects; +import java.util.Properties; /** * Default payload used for delta streamer. @@ -90,4 +94,35 @@ public Boolean overwriteField(Object value, Object defaultValue) { public Comparable getOrderingValue() { return this.orderingVal; } + + /** + * Returns whether the given record is newer than the record of this payload. + * + * @param orderingVal + * @param record The record + * @param prop The payload properties + * + * @return true if the given record is newer + */ + protected boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) { + String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); + if (!StringUtils.isNullOrEmpty(orderingField)) { + boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), + KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); + + Comparable oldOrderingVal = + (Comparable) HoodieAvroUtils.getNestedFieldVal( + (GenericRecord) record, + orderingField, + true, + consistentLogicalTimestampEnabled); + + // pick the payload with greater ordering value as insert record + return oldOrderingVal != null + && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) + && oldOrderingVal.compareTo(orderingVal) > 0; + } + return false; + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java index 08c020f63cacb..02941343dc5b0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java @@ -20,9 +20,6 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; -import org.apache.hudi.common.util.StringUtils; -import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -52,7 +49,7 @@ * and returns a merged record. * * Illustration with simple data. - * let's say the order field is 'ts' and schema is : + * let's say the order field is 'ts' and schema is: * { * [ * {"name":"id","type":"string"}, @@ -192,35 +189,4 @@ protected Option mergeDisorderRecordsWithMetadata( return Option.of(builder.build()); } } - - /** - * Returns whether the given record is newer than the record of this payload. - * - * @param orderingVal - * @param record The record - * @param prop The payload properties - * - * @return true if the given record is newer - */ - private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord record, Properties prop) { - String orderingField = prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY); - if (!StringUtils.isNullOrEmpty(orderingField)) { - boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(prop.getProperty( - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), - KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())); - - Comparable oldOrderingVal = - (Comparable) HoodieAvroUtils.getNestedFieldVal( - (GenericRecord) record, - orderingField, - true, - consistentLogicalTimestampEnabled); - - // pick the payload with greater ordering value as insert record - return oldOrderingVal != null - && ReflectionUtils.isSameClass(oldOrderingVal, orderingVal) - && oldOrderingVal.compareTo(orderingVal) > 0; - } - return false; - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/RecordCountAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordCountAvroPayload.java new file mode 100644 index 0000000000000..2b0a8512bc2c2 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/RecordCountAvroPayload.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Properties; + +/** + * Payload clazz that is used for pv/uv. + * In order to use 'RecordCountAvroPayload', we need to add field [hoodie_record_count bigint] + * to the schema when creating the hudi table to record the result of pv/uv, the field 'hoodie_record_count' + * does not need to be filled in, flink will automatically set 'hoodie_record_count' to 'null', + * and will update 'null' to '1' in #getInsertValue and #mergeOldRecord. + * + *

Simplified pv/uv calculation Logic : + *

+ *  1. #preCombine
+ *  For records with the same record key in one batch
+ *  or in the delta logs that belongs to same File Group,
+ *  Add their 'hoodie_record_count' field value and
+ *  overwrite the record with the larger ordering value.
+ *
+ *  2. #combineAndGetUpdateValue
+ *  For every incoming record with existing record in storage (same record key)
+ *  Add their 'hoodie_record_count' field value and overwrite the record
+ *  with the larger ordering value and returns a merged record.
+ *
+ *  Illustration with simple data.
+ *  let's say the order field is 'ts' and schema is:
+ *  {
+ *    [
+ *      {"name":"id","type":"string"},
+ *      {"name":"ts","type":"long"},
+ *      {"name":"name","type":"string"},
+ *      {"name":"hoodie_record_count","type":"long"}
+ *    ]
+ *  }
+ *
+ *  case 1
+ *  Current data:
+ *      id      ts      name    hoodie_record_count
+ *      1       1       name_1  1
+ *  Insert data:
+ *      id      ts      name    hoodie_record_count
+ *      1       2       name_2  2
+ *
+ *  Result data after #preCombine or #combineAndGetUpdateValue:
+ *      id      ts      name    hoodie_record_count
+ *      1       2       name_2  3
+ *
+ *  case 2
+ *  Current data:
+ *      id      ts      name    hoodie_record_count
+ *      1       2       name_1  null
+ *  Insert data:
+ *      id      ts      name    hoodie_record_count
+ *      1       1       name_2  1
+ *
+ *  Result data after #preCombine or #combineAndGetUpdateValue:
+ *      id      ts      name    hoodie_record_count
+ *      1       2       name_1  2
+ *
+ */ +public class RecordCountAvroPayload extends OverwriteWithLatestAvroPayload { + private static final Logger LOG = LogManager.getLogger(RecordCountAvroPayload.class); + private static final String DEFAULT_RECORD_COUNT_FIELD_VAL = "hoodie_record_count"; + + public RecordCountAvroPayload(GenericRecord record, Comparable orderingVal) { + super(record, orderingVal); + } + + public RecordCountAvroPayload(Option record) { + super(record); // natural order + } + + @Override + public RecordCountAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue, Schema schema, Properties properties) { + if (oldValue.recordBytes.length == 0) { + // use natural order for delete record + return this; + } + // pick the payload with greater ordering value as insert record + final boolean shouldPickOldRecord = oldValue.orderingVal.compareTo(orderingVal) > 0 ? true : false; + try { + GenericRecord oldRecord = HoodieAvroUtils.bytesToAvro(oldValue.recordBytes, schema); + // Get the record that needs to be stored. + Option mergedRecord = mergeOldRecord(oldRecord, schema, shouldPickOldRecord); + if (mergedRecord.isPresent()) { + return new RecordCountAvroPayload((GenericRecord) mergedRecord.get(), + shouldPickOldRecord ? oldValue.orderingVal : this.orderingVal); + } + } catch (Exception e) { + LOG.warn("RecordCountAvroPayload will use the current value, Exception is: " + e); + return this; + } + return this; + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException { + return this.mergeOldRecord(currentValue, schema, false); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties prop) + throws IOException { + return mergeOldRecord(currentValue, schema, isRecordNewer(orderingVal, currentValue, prop)); + } + + @Override + public Option getInsertValue(Schema schema) throws IOException { + if (recordBytes.length == 0) { + return Option.empty(); + } + IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); + if (isDeleteRecord((GenericRecord) indexedRecord)) { + return Option.empty(); + } else { + try { + // Flink automatically set 'hoodie_record_count' to 'null', here updated to 1, so that the query result is 1. + if (((GenericRecord) indexedRecord).get(DEFAULT_RECORD_COUNT_FIELD_VAL) == null) { + ((GenericRecord) indexedRecord).put(DEFAULT_RECORD_COUNT_FIELD_VAL, 1L); + } + } catch (AvroRuntimeException e) { + throw new HoodieException( + String.format("When using RecordCountAvroPayload, an additional column (hoodie_record_count bigint) needs to be added to the source schema," + + "current schema is [%s].", schema.toString()), e); + } + return Option.of(indexedRecord); + } + } + + /** + * Get the latest delta record and update the 'hoodie_record_count' field. + * + * @param currentValue The current record from file + * @param schema The record schema + * @param isBaseRecord Pick the payload with greatest ordering value as base record + * + * @return the merged record option. + */ + private Option mergeOldRecord(IndexedRecord currentValue, Schema schema, boolean isBaseRecord) throws IOException { + Option recordOption = getInsertValue(schema); + if (!recordOption.isPresent()) { + return Option.empty(); + } + final GenericRecord baseRecord; + final GenericRecord mergedRecord; + if (isBaseRecord) { + baseRecord = (GenericRecord) currentValue; + mergedRecord = (GenericRecord) recordOption.get(); + } else { + baseRecord = (GenericRecord) recordOption.get(); + mergedRecord = (GenericRecord) currentValue; + } + if (isDeleteRecord(baseRecord)) { + return Option.empty(); + } + + try { + // When adding, 'null' represents '1' + long currentRecordCount = mergedRecord.get(DEFAULT_RECORD_COUNT_FIELD_VAL) == null ? 1L : (long) mergedRecord.get(DEFAULT_RECORD_COUNT_FIELD_VAL); + long insertRecordCount = baseRecord.get(DEFAULT_RECORD_COUNT_FIELD_VAL) == null ? 1L : (long) baseRecord.get(DEFAULT_RECORD_COUNT_FIELD_VAL); + baseRecord.put(DEFAULT_RECORD_COUNT_FIELD_VAL, currentRecordCount + insertRecordCount); + } catch (AvroRuntimeException e) { + throw new HoodieException( + String.format("When using RecordCountAvroPayload, an additional column (hoodie_record_count bigint) needs to be added to the source schema," + + "current schema is [%s].", schema.toString()), e); + } + + return Option.of(baseRecord); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestRecordCountAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestRecordCountAvroPayload.java new file mode 100644 index 0000000000000..231e5a4fe3e4c --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestRecordCountAvroPayload.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Unit tests {@link TestRecordCountAvroPayload}. + */ +public class TestRecordCountAvroPayload { + private Schema schema; + + private Properties properties = new Properties(); + String jsonSchema = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n" + + " \"fields\": [\n" + + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"partition\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"_hoodie_is_deleted\", \"type\": [\"null\", \"boolean\"]},\n" + + " {\"name\": \"hoodie_record_count\", \"type\": [\"null\", \"long\"]},\n" + + " {\"name\": \"city\", \"type\": [\"null\", \"string\"]},\n" + + " {\"name\": \"child\", \"type\": [\"null\", {\"type\": \"array\", \"items\": \"string\"}]}\n" + + " ]\n" + + "}"; + + @BeforeEach + public void setUp() throws Exception { + schema = new Schema.Parser().parse(jsonSchema); + properties = new Properties(); + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, "ts"); + } + + @Test + public void testActiveRecords() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition1"); + record1.put("ts", 1L); + record1.put("_hoodie_is_deleted", false); + record1.put("hoodie_record_count", null); + record1.put("city", "NY0"); + record1.put("child", Arrays.asList("A")); + + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", "1"); + record2.put("partition", "partition1"); + record2.put("ts", 2L); + record2.put("_hoodie_is_deleted", false); + record2.put("hoodie_record_count", 3L); + record2.put("city", null); + record2.put("child", Arrays.asList("B")); + + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "1"); + record3.put("partition", "partition1"); + record3.put("ts", 2L); + record3.put("_hoodie_is_deleted", false); + record3.put("hoodie_record_count", 4L); + record3.put("city", "NY0"); + record3.put("child", Arrays.asList("A")); + + GenericRecord record4 = new GenericData.Record(schema); + record4.put("id", "1"); + record4.put("partition", "partition1"); + record4.put("ts", 2L); + record4.put("_hoodie_is_deleted", false); + record4.put("hoodie_record_count", 4L); + record4.put("city", null); + record4.put("child", Arrays.asList("B")); + + // Test preCombine: payload2's ordering val is larger + RecordCountAvroPayload payload1 = new RecordCountAvroPayload(record1, 1L); + RecordCountAvroPayload payload2 = new RecordCountAvroPayload(record2, 2L); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new RecordCountAvroPayload(record4, 2).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new RecordCountAvroPayload(record4, 2).recordBytes); + + // 'hoodie_record_count' field value is 'null' and represents 1 + record1.put("hoodie_record_count", 1L); + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertEquals(record2, payload2.getInsertValue(schema).get()); + + // Test combineAndGetUpdateValue: payload2's ordering val is larger + assertEquals(payload1.combineAndGetUpdateValue(record2, schema, properties).get(), record4); + assertEquals(payload2.combineAndGetUpdateValue(record1, schema, properties).get(), record4); + + // Test preCombine again: let payload1's ordering val larger than payload2 + record1.put("ts", 2L); + record2.put("ts", 1L); + // reset value + record2.put("hoodie_record_count", 3L); + payload1 = new RecordCountAvroPayload(record1, 2); + payload2 = new RecordCountAvroPayload(record2, 1); + assertArrayEquals(payload1.preCombine(payload2, schema, properties).recordBytes, new RecordCountAvroPayload(record3, 2).recordBytes); + assertArrayEquals(payload2.preCombine(payload1, schema, properties).recordBytes, new RecordCountAvroPayload(record3, 2).recordBytes); + } + + @Test + public void testDeletedRecord() throws IOException { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", "1"); + record1.put("partition", "partition0"); + record1.put("ts", 1L); + record1.put("_hoodie_is_deleted", false); + record1.put("hoodie_record_count", null); + record1.put("city", "NY0"); + record1.put("child", Collections.emptyList()); + + GenericRecord delRecord1 = new GenericData.Record(schema); + delRecord1.put("id", "2"); + delRecord1.put("partition", "partition1"); + delRecord1.put("ts", 2L); + delRecord1.put("_hoodie_is_deleted", true); + delRecord1.put("hoodie_record_count", 1L); + delRecord1.put("city", "NY0"); + delRecord1.put("child", Collections.emptyList()); + + RecordCountAvroPayload payload1 = new RecordCountAvroPayload(record1, 1L); + RecordCountAvroPayload payload2 = new RecordCountAvroPayload(delRecord1, 2L); + assertEquals(payload1.preCombine(payload2, schema, properties), payload1); + assertEquals(payload2.preCombine(payload1, schema, properties), payload2); + + // 'hoodie_record_count' field value is 'null' and represents 1 + record1.put("hoodie_record_count", 1L); + assertEquals(record1, payload1.getInsertValue(schema).get()); + assertFalse(payload2.getInsertValue(schema).isPresent()); + + assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema, properties), Option.empty()); + assertFalse(payload2.combineAndGetUpdateValue(record1, schema, properties).isPresent()); + } +}