diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java index 42e2556ea59fa..e008c04e33d1c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/processor/maxwell/MaxwellJsonKafkaSourcePostProcessor.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.DateTimeUtils; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException; import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor; @@ -29,8 +30,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import java.util.Locale; @@ -49,12 +48,15 @@ */ public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor { - private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); + private final Option databaseRegex; + private final String tableRegex; + public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) { super(props); + databaseRegex = Option.ofNullable(props.getString(Config.DATABASE_NAME_REGEX_PROP.key(), null)); + tableRegex = props.getString(Config.TABLE_NAME_REGEX_PROP.key()); } // ------------------------------------------------------------------------ @@ -111,9 +113,6 @@ public JavaRDD process(JavaRDD maxwellJsonRecords) { // filter out target databases and tables if (isTargetTable(database, table)) { - - LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table)); - ObjectNode result = (ObjectNode) inputJson.get(DATA); String type = inputJson.get(OPERATION_TYPE).textValue(); @@ -182,9 +181,11 @@ private String processDelete(JsonNode inputJson, ObjectNode result) { * @param table table the data belong to */ private boolean isTargetTable(String database, String table) { - String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key()); - String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key()); - return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table); + if (!databaseRegex.isPresent()) { + return Pattern.matches(tableRegex, table); + } else { + return Pattern.matches(databaseRegex.get(), database) && Pattern.matches(tableRegex, table); + } } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java index 80ac2f921ecd5..1d80b68449c79 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java @@ -178,6 +178,12 @@ public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException { + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\"," + "\"update_time\":\"2022-03-12 08:31:56\"}}"; + // database hudi_02, table hudi_maxwell_01, insert + String hudi02Maxwell01Insert = "{\"database\":\"hudi_02\",\"table\":\"hudi_maxwell_01\",\"type\":\"insert\"," + + "\"ts\":1647073916,\"xid\":4990,\"commit\":true,\"data\":{\"id\":\"9bb17f316ee8488cb107621ddf0f3cb0\"," + + "\"name\":\"andy\",\"age\":17,\"insert_time\":\"2022-03-12 08:31:56\"," + + "\"update_time\":\"2022-03-12 08:31:56\"}}"; + // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ @@ -248,6 +254,14 @@ public void testMaxwellJsonKafkaSourcePostProcessor() throws IOException { // ddl data will be ignored, ths count should be 0 long ddlDataNum = processor.process(ddlData).count(); assertEquals(0, ddlDataNum); + + // test table regex without database regex + props.remove(MaxwellJsonKafkaSourcePostProcessor.Config.DATABASE_NAME_REGEX_PROP.key()); + props.setProperty(MaxwellJsonKafkaSourcePostProcessor.Config.TABLE_NAME_REGEX_PROP.key(), "hudi_maxwell(_)?[0-9]{0,2}"); + + JavaRDD dataWithoutDatabaseRegex = jsc().parallelize(Arrays.asList(hudiMaxwell01Insert, hudi02Maxwell01Insert)); + long countWithoutDatabaseRegex = processor.process(dataWithoutDatabaseRegex).count(); + assertEquals(2, countWithoutDatabaseRegex); } /**