From 117a7dd824d4f15b0dd56eda858eda35feb960d4 Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Tue, 8 Mar 2022 23:21:07 +0400 Subject: [PATCH 1/3] [HUDI-3547] Introduce MaxwellSourcePostProcessor to extract data from Maxwell json string --- .../hudi/common/util/DateTimeUtils.java | 17 ++ .../MaxwellJsonKafkaSourcePostProcessor.java | 186 ++++++++++++++++++ .../maxwell/PreCombineFieldType.java | 44 +++++ .../TestJsonKafkaSourcePostProcessor.java | 6 + .../maxwell-json-string.json | 3 + 5 files changed, 256 insertions(+) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java create mode 100644 hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java index e52e5660957dd..531a0903f66fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -21,6 +21,9 @@ import java.time.Duration; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -126,6 +129,20 @@ private static Map initMap() { return labelToUnit; } + /** + * Convert UNIX_TIMESTAMP to string in given format. + * + * @param unixTimestamp UNIX_TIMESTAMP + * @param timeFormat string time format + */ + public static String formatUnixTimestamp(long unixTimestamp, String timeFormat) { + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(timeFormat)); + DateTimeFormatter dtf = DateTimeFormatter.ofPattern(timeFormat); + return LocalDateTime + .ofInstant(Instant.ofEpochSecond(unixTimestamp), ZoneId.systemDefault()) + .format(dtf); + } + /** * Enum which defines time unit, mostly used to parse value from configuration file. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java new file mode 100644 index 0000000000000..5bdb8381e6672 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor.maxwell; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.DateTimeUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; +import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Locale; +import java.util.Objects; +import java.util.regex.Pattern; + +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.DATE_STRING; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.EPOCHMILLISECONDS; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.NON_TIMESTAMP; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.UNIX_TIMESTAMP; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.valueOf; + +/** + * A {@link JsonKafkaSourcePostProcessor} help to extract fresh data from maxwell json string and tag the record as + * delete or not. + */ +public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { + + private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) { + super(props); + } + + // ------------------------------------------------------------------------ + // Partial fields in maxwell json string + // ------------------------------------------------------------------------ + + private static final String DATABASE = "database"; + private static final String TABLE = "table"; + private static final String DATA = "data"; + private static final String OPERATION_TYPE = "type"; + private static final String TS = "ts"; + + // ------------------------------------------------------------------------ + // Operation types + // ------------------------------------------------------------------------ + + private static final String INSERT = "insert"; + private static final String UPDATE = "update"; + private static final String DELETE = "delete"; + + /** + * Configs to be passed for this processor. + */ + public static class Config { + private static final ConfigProperty DATABASE_NAME_REGEX_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex") + .noDefaultValue() + .withDocumentation("Database name regex."); + + private static final ConfigProperty TABLE_NAME_REGEX_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex") + .noDefaultValue() + .withDocumentation("Table name regex."); + + private static final ConfigProperty PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type") + .defaultValue("DATA_STRING") + .withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING," + + "UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATA_STRING by default "); + + private static final ConfigProperty PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format") + .defaultValue("yyyy-MM-dd HH:mm:ss") + .withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodie" + + "what format it is. 'yyyy-MM-dd HH:mm:ss' by default"); + } + + @Override + public JavaRDD process(JavaRDD maxwellJsonRecords) { + return maxwellJsonRecords.map(record -> { + JsonNode inputJson = MAPPER.readTree(record); + String database = inputJson.get(DATABASE).toString(); + String table = inputJson.get(TABLE).toString(); + + // filter out target databases and tables + if (isTargetTable(database, table)) { + + LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table)); + + ObjectNode result = (ObjectNode) inputJson.get(DATA); + String type = inputJson.get(OPERATION_TYPE).toString(); + + // insert or update + if (INSERT.equals(type) || UPDATE.equals(type)) { + // tag this record not delete. + result.put(HoodieRecord.HOODIE_IS_DELETED, false); + return result.toString(); + + // delete + } else if (DELETE.equals(type)) { + // tag this record as delete. + result.put(HoodieRecord.HOODIE_IS_DELETED, true); + + PreCombineFieldType preCombineFieldType = + valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), + Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toLowerCase(Locale.ROOT)); + + // maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we + // want to delete this record correctly, we should update its `update_time` to a time closer to where the + // delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time. + + // we can update the `update_time`(delete time) only when it is in timestamp format. + if (!preCombineFieldType.equals(NON_TIMESTAMP)) { + String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), + HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue()); + + // ts from maxwell + long ts = inputJson.get(TS).longValue(); + + // convert the `update_time`(delete time) to the proper format. + if (preCombineFieldType.equals(DATE_STRING)) { + // DATE_STRING format + String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue()); + result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat)); + } else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) { + // EPOCHMILLISECONDS format + result.put(preCombineField, ts * 1000L); + } else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) { + // UNIX_TIMESTAMP format + result.put(preCombineField, ts); + } else { + throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType); + } + } + return result.toString(); + } else { + // there might be some ddl data, ignore it + return null; + } + } else { + // not the data from target table(s), ignore it + return null; + } + }).filter(Objects::nonNull); + } + + /** + * Check if it is the right table we want to consume from. + * + * @param database database the data belong to + * @param table table the data belong to + */ + private boolean isTargetTable(String database, String table) { + String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key()); + String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key()); + return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java new file mode 100644 index 0000000000000..d3969a02bc4f3 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor.maxwell; + +/** + * Enum of preCombine field time type. + */ +public enum PreCombineFieldType { + /** + * Not a timestamp type field + */ + NON_TIMESTAMP, + + /** + * Timestamp type field in string format. + */ + DATE_STRING, + + /** + * Timestamp type field in UNIX_TIMESTAMP format. + */ + UNIX_TIMESTAMP, + + /** + * Timestamp type field in EPOCHMILLISECONDS format. + */ + EPOCHMILLISECONDS +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index bd150ed29be38..b1228ea9ac1f1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -120,6 +120,12 @@ public void testChainedJsonKafkaSourcePostProcessor() { assertEquals(0, fetch1.getBatch().get().count()); } + @Test + public void testMaxwellJsonKafkaSourcePostProcessor() { + + + } + /** * JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using * {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method. diff --git a/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json b/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json new file mode 100644 index 0000000000000..2a61035f9531c --- /dev/null +++ b/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json @@ -0,0 +1,3 @@ +{"database": "test", "table": "maxwell", "type": "insert", "ts": 1449786310, "xid": 940752, "commit": true, "data": { "id":1, "daemon": "Stanislaw Lem" }} +{"database": "test", "table": "maxwell", "type": "update", "ts": 1449786341, "xid": 940786, "commit": true, "data": {"id":1, "daemon": "Firebus! Firebus!"}, "old": {"daemon": "Stanislaw Lem"}} +{"database": "test", "table": "maxwell", "type": "delete", "ts": 1449787751, "xid": 940985, "commit": true, "data": {"id":1, "daemon": "Firebus! Firebus!"}, "old": {"daemon": "Stanislaw Lem"}} From 05858e09bcdb7381be01f3fe9d213089316eddd2 Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Sat, 12 Mar 2022 18:09:22 +0400 Subject: [PATCH 2/3] add ut --- .../MaxwellJsonKafkaSourcePostProcessor.java | 16 +-- .../TestJsonKafkaSourcePostProcessor.java | 130 +++++++++++++++++- .../maxwell-json-string.json | 3 - 3 files changed, 135 insertions(+), 14 deletions(-) delete mode 100644 hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java index 5bdb8381e6672..95a11cdce4dfa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java @@ -79,23 +79,23 @@ public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) { * Configs to be passed for this processor. */ public static class Config { - private static final ConfigProperty DATABASE_NAME_REGEX_PROP = ConfigProperty + public static final ConfigProperty DATABASE_NAME_REGEX_PROP = ConfigProperty .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex") .noDefaultValue() .withDocumentation("Database name regex."); - private static final ConfigProperty TABLE_NAME_REGEX_PROP = ConfigProperty + public static final ConfigProperty TABLE_NAME_REGEX_PROP = ConfigProperty .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex") .noDefaultValue() .withDocumentation("Table name regex."); - private static final ConfigProperty PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty + public static final ConfigProperty PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type") .defaultValue("DATA_STRING") .withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING," + "UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATA_STRING by default "); - private static final ConfigProperty PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty + public static final ConfigProperty PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format") .defaultValue("yyyy-MM-dd HH:mm:ss") .withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodie" @@ -106,8 +106,8 @@ public static class Config { public JavaRDD process(JavaRDD maxwellJsonRecords) { return maxwellJsonRecords.map(record -> { JsonNode inputJson = MAPPER.readTree(record); - String database = inputJson.get(DATABASE).toString(); - String table = inputJson.get(TABLE).toString(); + String database = inputJson.get(DATABASE).textValue(); + String table = inputJson.get(TABLE).textValue(); // filter out target databases and tables if (isTargetTable(database, table)) { @@ -115,7 +115,7 @@ public JavaRDD process(JavaRDD maxwellJsonRecords) { LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table)); ObjectNode result = (ObjectNode) inputJson.get(DATA); - String type = inputJson.get(OPERATION_TYPE).toString(); + String type = inputJson.get(OPERATION_TYPE).textValue(); // insert or update if (INSERT.equals(type) || UPDATE.equals(type)) { @@ -130,7 +130,7 @@ public JavaRDD process(JavaRDD maxwellJsonRecords) { PreCombineFieldType preCombineFieldType = valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), - Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toLowerCase(Locale.ROOT)); + Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT)); // maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we // want to delete this record correctly, we should update its `update_time` to a time closer to where the diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index b1228ea9ac1f1..80ac2f921ecd5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -18,24 +18,36 @@ package org.apache.hudi.utilities.sources; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.DateTimeUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; +import org.apache.hudi.utilities.sources.processor.maxwell.MaxwellJsonKafkaSourcePostProcessor; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.Objects; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource { @@ -121,9 +133,121 @@ public void testChainedJsonKafkaSourcePostProcessor() { } @Test - public void testMaxwellJsonKafkaSourcePostProcessor() { - - + public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException { + // ------------------------------------------------------------------------ + // Maxwell data + // ------------------------------------------------------------------------ + + // database hudi, table hudi_maxwell_01 (insert, update and delete) + String hudiMaxwell01Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\"," + + "\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\"," + + "\"update_time\":\"2022-03-12 08:40:02\"}}"; + + String hudiMaxwell01Update = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\"," + + "\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}," + + "\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}"; + + String hudiMaxwell01Delete = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\"," + + "\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}"; + + String hudiMaxwell01Ddl = "{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\"," + + "\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\"," + + "\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"}," + + "{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\"," + + "\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0}," + + "{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\"," + + "\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"]," + + "\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\"," + + "\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true}," + + "{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\"," + + "\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/* ApplicationName=DBeaver " + + "21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age int(3) NULL\"}"; + + // database hudi, table hudi_maxwell_010, insert + String hudiMaxwell010Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\"," + + "\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\"," + + "\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 08:33:02\"," + + "\"update_time\":\"2022-03-12 08:33:02\"}}"; + + // database hudi_02, table hudi_maxwell_02, insert + String hudi02Maxwell02Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\"," + + "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\"," + + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\"," + + "\"update_time\":\"2022-03-12 08:31:56\"}}"; + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + ObjectMapper mapper = new ObjectMapper(); + TypedProperties props = new TypedProperties(); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(), "hudi(_)?[0-9]{0,2}"); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}"); + + // test insert and update + JavaRDD inputInsertAndUpdate = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update)); + MaxwellJsonKafkaSourcePostProcessor processor = new MaxwellJsonKafkaSourcePostProcessor(props); + processor.process(inputInsertAndUpdate).map(mapper::readTree).foreach(record -> { + // database name should be null + JsonNode database = record.get("database"); + // insert and update records should be tagged as no delete + boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue(); + + assertFalse(isDelete); + assertNull(database); + }); + + // test delete + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "DATE_STRING"); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), "yyyy-MM-dd HH:mm:ss"); + props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "update_time"); + + JavaRDD inputDelete = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete)); + + long ts = mapper.readTree(hudiMaxwell01Delete).get("ts").longValue(); + String formatTs = DateTimeUtils.formatUnixTimestamp(ts, "yyyy-MM-dd HH:mm:ss"); + + new MaxwellJsonKafkaSourcePostProcessor(props) + .process(inputDelete).map(mapper::readTree).foreach(record -> { + + // delete records should be tagged as delete + boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue(); + // update_time should equals ts + String updateTime = record.get("update_time").textValue(); + + assertEquals(formatTs, updateTime); + assertTrue(isDelete); + }); + + // test preCombine field is not time + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "NON_TIMESTAMP"); + props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "id"); + + JavaRDD inputDelete2 = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete)); + + String updateTimeInUpdate = mapper.readTree(hudiMaxwell01Update).get("data").get("update_time").textValue(); + new MaxwellJsonKafkaSourcePostProcessor(props) + .process(inputDelete2).map(mapper::readTree).foreach(record -> { + + // updateTimeInUpdate should updateTimeInDelete + String updateTimeInDelete = record.get("update_time").textValue(); + assertEquals(updateTimeInUpdate, updateTimeInDelete); + }); + + // test database, table regex + JavaRDD dirtyData = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell010Insert, hudi02Maxwell02Insert)); + long validDataNum = processor.process(dirtyData).count(); + // hudiMaxwell010Insert is dirty data + assertEquals(2, validDataNum); + + // test ddl + JavaRDD ddlData = jsc().parallelize(Collections.singletonList(hudiMaxwell01Ddl)); + // ddl data will be ignored, ths count should be 0 + long ddlDataNum = processor.process(ddlData).count(); + assertEquals(0, ddlDataNum); } /** diff --git a/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json b/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json deleted file mode 100644 index 2a61035f9531c..0000000000000 --- a/hudi-utilities/src/test/resources/maxwell-post-processor/maxwell-json-string.json +++ /dev/null @@ -1,3 +0,0 @@ -{"database": "test", "table": "maxwell", "type": "insert", "ts": 1449786310, "xid": 940752, "commit": true, "data": { "id":1, "daemon": "Stanislaw Lem" }} -{"database": "test", "table": "maxwell", "type": "update", "ts": 1449786341, "xid": 940786, "commit": true, "data": {"id":1, "daemon": "Firebus! Firebus!"}, "old": {"daemon": "Stanislaw Lem"}} -{"database": "test", "table": "maxwell", "type": "delete", "ts": 1449787751, "xid": 940985, "commit": true, "data": {"id":1, "daemon": "Firebus! Firebus!"}, "old": {"daemon": "Stanislaw Lem"}} From fe5eb55a4a5a840b49a6bd209ac3e84f4245b72d Mon Sep 17 00:00:00 2001 From: wangxianghu Date: Tue, 15 Mar 2022 10:10:08 +0400 Subject: [PATCH 3/3] Address comment --- .../MaxwellJsonKafkaSourcePostProcessor.java | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java index 95a11cdce4dfa..9ca91893bec69 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java @@ -125,41 +125,7 @@ public JavaRDD process(JavaRDD maxwellJsonRecords) { // delete } else if (DELETE.equals(type)) { - // tag this record as delete. - result.put(HoodieRecord.HOODIE_IS_DELETED, true); - - PreCombineFieldType preCombineFieldType = - valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), - Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT)); - - // maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we - // want to delete this record correctly, we should update its `update_time` to a time closer to where the - // delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time. - - // we can update the `update_time`(delete time) only when it is in timestamp format. - if (!preCombineFieldType.equals(NON_TIMESTAMP)) { - String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), - HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue()); - - // ts from maxwell - long ts = inputJson.get(TS).longValue(); - - // convert the `update_time`(delete time) to the proper format. - if (preCombineFieldType.equals(DATE_STRING)) { - // DATE_STRING format - String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue()); - result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat)); - } else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) { - // EPOCHMILLISECONDS format - result.put(preCombineField, ts * 1000L); - } else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) { - // UNIX_TIMESTAMP format - result.put(preCombineField, ts); - } else { - throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType); - } - } - return result.toString(); + return processDelete(inputJson, result); } else { // there might be some ddl data, ignore it return null; @@ -171,6 +137,44 @@ public JavaRDD process(JavaRDD maxwellJsonRecords) { }).filter(Objects::nonNull); } + private String processDelete(JsonNode inputJson, ObjectNode result) { + // tag this record as delete. + result.put(HoodieRecord.HOODIE_IS_DELETED, true); + + PreCombineFieldType preCombineFieldType = + valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), + Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT)); + + // maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we + // want to delete this record correctly, we should update its `update_time` to a time closer to where the + // delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time. + + // we can update the `update_time`(delete time) only when it is in timestamp format. + if (!preCombineFieldType.equals(NON_TIMESTAMP)) { + String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), + HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue()); + + // ts from maxwell + long ts = inputJson.get(TS).longValue(); + + // convert the `update_time`(delete time) to the proper format. + if (preCombineFieldType.equals(DATE_STRING)) { + // DATE_STRING format + String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue()); + result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat)); + } else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) { + // EPOCHMILLISECONDS format + result.put(preCombineField, ts * 1000L); + } else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) { + // UNIX_TIMESTAMP format + result.put(preCombineField, ts); + } else { + throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType); + } + } + return result.toString(); + } + /** * Check if it is the right table we want to consume from. *