diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java index e52e5660957dd..531a0903f66fc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java @@ -21,6 +21,9 @@ import java.time.Duration; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -126,6 +129,20 @@ private static Map initMap() { return labelToUnit; } + /** + * Convert UNIX_TIMESTAMP to string in given format. + * + * @param unixTimestamp UNIX_TIMESTAMP + * @param timeFormat string time format + */ + public static String formatUnixTimestamp(long unixTimestamp, String timeFormat) { + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(timeFormat)); + DateTimeFormatter dtf = DateTimeFormatter.ofPattern(timeFormat); + return LocalDateTime + .ofInstant(Instant.ofEpochSecond(unixTimestamp), ZoneId.systemDefault()) + .format(dtf); + } + /** * Enum which defines time unit, mostly used to parse value from configuration file. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java new file mode 100644 index 0000000000000..9ca91893bec69 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor.maxwell; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.DateTimeUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; +import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; + +import java.util.Locale; +import java.util.Objects; +import java.util.regex.Pattern; + +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.DATE_STRING; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.EPOCHMILLISECONDS; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.NON_TIMESTAMP; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.UNIX_TIMESTAMP; +import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.valueOf; + +/** + * A {@link JsonKafkaSourcePostProcessor} help to extract fresh data from maxwell json string and tag the record as + * delete or not. + */ +public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { + + private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) { + super(props); + } + + // ------------------------------------------------------------------------ + // Partial fields in maxwell json string + // ------------------------------------------------------------------------ + + private static final String DATABASE = "database"; + private static final String TABLE = "table"; + private static final String DATA = "data"; + private static final String OPERATION_TYPE = "type"; + private static final String TS = "ts"; + + // ------------------------------------------------------------------------ + // Operation types + // ------------------------------------------------------------------------ + + private static final String INSERT = "insert"; + private static final String UPDATE = "update"; + private static final String DELETE = "delete"; + + /** + * Configs to be passed for this processor. + */ + public static class Config { + public static final ConfigProperty DATABASE_NAME_REGEX_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex") + .noDefaultValue() + .withDocumentation("Database name regex."); + + public static final ConfigProperty TABLE_NAME_REGEX_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex") + .noDefaultValue() + .withDocumentation("Table name regex."); + + public static final ConfigProperty PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type") + .defaultValue("DATA_STRING") + .withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING," + + "UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATA_STRING by default "); + + public static final ConfigProperty PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty + .key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format") + .defaultValue("yyyy-MM-dd HH:mm:ss") + .withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodie" + + "what format it is. 'yyyy-MM-dd HH:mm:ss' by default"); + } + + @Override + public JavaRDD process(JavaRDD maxwellJsonRecords) { + return maxwellJsonRecords.map(record -> { + JsonNode inputJson = MAPPER.readTree(record); + String database = inputJson.get(DATABASE).textValue(); + String table = inputJson.get(TABLE).textValue(); + + // filter out target databases and tables + if (isTargetTable(database, table)) { + + LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table)); + + ObjectNode result = (ObjectNode) inputJson.get(DATA); + String type = inputJson.get(OPERATION_TYPE).textValue(); + + // insert or update + if (INSERT.equals(type) || UPDATE.equals(type)) { + // tag this record not delete. + result.put(HoodieRecord.HOODIE_IS_DELETED, false); + return result.toString(); + + // delete + } else if (DELETE.equals(type)) { + return processDelete(inputJson, result); + } else { + // there might be some ddl data, ignore it + return null; + } + } else { + // not the data from target table(s), ignore it + return null; + } + }).filter(Objects::nonNull); + } + + private String processDelete(JsonNode inputJson, ObjectNode result) { + // tag this record as delete. + result.put(HoodieRecord.HOODIE_IS_DELETED, true); + + PreCombineFieldType preCombineFieldType = + valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(), + Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT)); + + // maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we + // want to delete this record correctly, we should update its `update_time` to a time closer to where the + // delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time. + + // we can update the `update_time`(delete time) only when it is in timestamp format. + if (!preCombineFieldType.equals(NON_TIMESTAMP)) { + String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), + HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue()); + + // ts from maxwell + long ts = inputJson.get(TS).longValue(); + + // convert the `update_time`(delete time) to the proper format. + if (preCombineFieldType.equals(DATE_STRING)) { + // DATE_STRING format + String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue()); + result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat)); + } else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) { + // EPOCHMILLISECONDS format + result.put(preCombineField, ts * 1000L); + } else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) { + // UNIX_TIMESTAMP format + result.put(preCombineField, ts); + } else { + throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType); + } + } + return result.toString(); + } + + /** + * Check if it is the right table we want to consume from. + * + * @param database database the data belong to + * @param table table the data belong to + */ + private boolean isTargetTable(String database, String table) { + String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key()); + String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key()); + return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table); + } + +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java new file mode 100644 index 0000000000000..d3969a02bc4f3 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/PreCombineFieldType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.processor.maxwell; + +/** + * Enum of preCombine field time type. + */ +public enum PreCombineFieldType { + /** + * Not a timestamp type field + */ + NON_TIMESTAMP, + + /** + * Timestamp type field in string format. + */ + DATE_STRING, + + /** + * Timestamp type field in UNIX_TIMESTAMP format. + */ + UNIX_TIMESTAMP, + + /** + * Timestamp type field in EPOCHMILLISECONDS format. + */ + EPOCHMILLISECONDS +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index bd150ed29be38..80ac2f921ecd5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -18,24 +18,36 @@ package org.apache.hudi.utilities.sources; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.DateTimeUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; +import org.apache.hudi.utilities.sources.processor.maxwell.MaxwellJsonKafkaSourcePostProcessor; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.Objects; import static org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.Config.JSON_KAFKA_PROCESSOR_CLASS_OPT; import static org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers.jsonifyRecords; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestJsonKafkaSourcePostProcessor extends TestJsonKafkaSource { @@ -120,6 +132,124 @@ public void testChainedJsonKafkaSourcePostProcessor() { assertEquals(0, fetch1.getBatch().get().count()); } + @Test + public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException { + // ------------------------------------------------------------------------ + // Maxwell data + // ------------------------------------------------------------------------ + + // database hudi, table hudi_maxwell_01 (insert, update and delete) + String hudiMaxwell01Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\"," + + "\"ts\":1647074402,\"xid\":6233,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\"," + + "\"update_time\":\"2022-03-12 08:40:02\"}}"; + + String hudiMaxwell01Update = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"update\"," + + "\"ts\":1647074482,\"xid\":6440,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}," + + "\"old\":{\"age\":18,\"insert_time\":\"2022-03-12 08:40:02\",\"update_time\":\"2022-03-12 08:40:02\"}}"; + + String hudiMaxwell01Delete = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\",\"type\":\"delete\"," + + "\"ts\":1647074555,\"xid\":6631,\"commit\":true,\"data\":{\"id\":\"6018220e39e74477b45c7cf42f66bdc0\"," + + "\"name\":\"mathieu\",\"age\":20,\"insert_time\":\"2022-03-12 04:40:02\",\"update_time\":\"2022-03-12 04:42:25\"}}"; + + String hudiMaxwell01Ddl = "{\"type\":\"table-alter\",\"database\":\"hudi\",\"table\":\"hudi_maxwell_01\"," + + "\"old\":{\"database\":\"hudi\",\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\"," + + "\"primary-key\":[\"id\"],\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"}," + + "{\"type\":\"varchar\",\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\"," + + "\"signed\":true},{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0}," + + "{\"type\":\"timestamp\",\"name\":\"update_time\",\"column-length\":0}]},\"def\":{\"database\":\"hudi\"," + + "\"charset\":\"utf8\",\"table\":\"hudi_maxwell_01\",\"primary-key\":[\"id\"]," + + "\"columns\":[{\"type\":\"varchar\",\"name\":\"id\",\"charset\":\"utf8\"},{\"type\":\"varchar\"," + + "\"name\":\"name\",\"charset\":\"utf8\"},{\"type\":\"int\",\"name\":\"age\",\"signed\":true}," + + "{\"type\":\"timestamp\",\"name\":\"insert_time\",\"column-length\":0},{\"type\":\"timestamp\"," + + "\"name\":\"update_time\",\"column-length\":0}]},\"ts\":1647072305000,\"sql\":\"/* ApplicationName=DBeaver " + + "21.0.4 - Main */ ALTER TABLE hudi.hudi_maxwell_01 MODIFY COLUMN age int(3) NULL\"}"; + + // database hudi, table hudi_maxwell_010, insert + String hudiMaxwell010Insert = "{\"database\":\"hudi\",\"table\":\"hudi_maxwell_010\",\"type\":\"insert\"," + + "\"ts\":1647073982,\"xid\":5164,\"commit\":true,\"data\":{\"id\":\"f3eaf4cdf7534e47a88cdf93d19b2ee6\"," + + "\"name\":\"wangxianghu\",\"age\":18,\"insert_time\":\"2022-03-12 08:33:02\"," + + "\"update_time\":\"2022-03-12 08:33:02\"}}"; + + // database hudi_02, table hudi_maxwell_02, insert + String hudi02Maxwell02Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_02\",\"type\":\"insert\"," + + "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\"," + + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\"," + + "\"update_time\":\"2022-03-12 08:31:56\"}}"; + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + ObjectMapper mapper = new ObjectMapper(); + TypedProperties props = new TypedProperties(); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key(), "hudi(_)?[0-9]{0,2}"); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}"); + + // test insert and update + JavaRDD inputInsertAndUpdate = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell01Update)); + MaxwellJsonKafkaSourcePostProcessor processor = new MaxwellJsonKafkaSourcePostProcessor(props); + processor.process(inputInsertAndUpdate).map(mapper::readTree).foreach(record -> { + // database name should be null + JsonNode database = record.get("database"); + // insert and update records should be tagged as no delete + boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue(); + + assertFalse(isDelete); + assertNull(database); + }); + + // test delete + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "DATE_STRING"); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), "yyyy-MM-dd HH:mm:ss"); + props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "update_time"); + + JavaRDD inputDelete = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete)); + + long ts = mapper.readTree(hudiMaxwell01Delete).get("ts").longValue(); + String formatTs = DateTimeUtils.formatUnixTimestamp(ts, "yyyy-MM-dd HH:mm:ss"); + + new MaxwellJsonKafkaSourcePostProcessor(props) + .process(inputDelete).map(mapper::readTree).foreach(record -> { + + // delete records should be tagged as delete + boolean isDelete = record.get(HoodieRecord.HOODIE_IS_DELETED).booleanValue(); + // update_time should equals ts + String updateTime = record.get("update_time").textValue(); + + assertEquals(formatTs, updateTime); + assertTrue(isDelete); + }); + + // test preCombine field is not time + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.PRECOMBINE_FIELD_TYPE_PROP.key(), "NON_TIMESTAMP"); + props.setProperty(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "id"); + + JavaRDD inputDelete2 = jsc().parallelize(Collections.singletonList(hudiMaxwell01Delete)); + + String updateTimeInUpdate = mapper.readTree(hudiMaxwell01Update).get("data").get("update_time").textValue(); + new MaxwellJsonKafkaSourcePostProcessor(props) + .process(inputDelete2).map(mapper::readTree).foreach(record -> { + + // updateTimeInUpdate should updateTimeInDelete + String updateTimeInDelete = record.get("update_time").textValue(); + assertEquals(updateTimeInUpdate, updateTimeInDelete); + }); + + // test database, table regex + JavaRDD dirtyData = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudiMaxwell010Insert, hudi02Maxwell02Insert)); + long validDataNum = processor.process(dirtyData).count(); + // hudiMaxwell010Insert is dirty data + assertEquals(2, validDataNum); + + // test ddl + JavaRDD ddlData = jsc().parallelize(Collections.singletonList(hudiMaxwell01Ddl)); + // ddl data will be ignored, ths count should be 0 + long ddlDataNum = processor.process(ddlData).count(); + assertEquals(0, ddlDataNum); + } + /** * JsonKafkaSourcePostProcessor that return a sub RDD of the incoming data which get the data from incoming data using * {org.apache.spark.api.java.JavaRDD#sample(boolean, double, long)} method.