Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
Expand Down Expand Up @@ -126,6 +129,20 @@ private static Map<String, ChronoUnit> initMap() {
return labelToUnit;
}

/**
* Convert UNIX_TIMESTAMP to string in given format.
*
* @param unixTimestamp UNIX_TIMESTAMP
* @param timeFormat string time format
*/
public static String formatUnixTimestamp(long unixTimestamp, String timeFormat) {
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(timeFormat));
DateTimeFormatter dtf = DateTimeFormatter.ofPattern(timeFormat);
return LocalDateTime
.ofInstant(Instant.ofEpochSecond(unixTimestamp), ZoneId.systemDefault())
.format(dtf);
}

/**
* Enum which defines time unit, mostly used to parse value from configuration file.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources.processor.maxwell;

import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.DateTimeUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
import org.apache.hudi.utilities.sources.processor.JsonKafkaSourcePostProcessor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;

import java.util.Locale;
import java.util.Objects;
import java.util.regex.Pattern;

import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.DATE_STRING;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.EPOCHMILLISECONDS;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.NON_TIMESTAMP;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.UNIX_TIMESTAMP;
import static org.apache.hudi.utilities.sources.processor.maxwell.PreCombineFieldType.valueOf;

/**
* A {@link JsonKafkaSourcePostProcessor} help to extract fresh data from maxwell json string and tag the record as
* delete or not.
*/
public class MaxwellJsonKafkaSourcePostProcessor extends JsonKafkaSourcePostProcessor {

private static final Logger LOG = LogManager.getLogger(MaxwellJsonKafkaSourcePostProcessor.class);

private static final ObjectMapper MAPPER = new ObjectMapper();

public MaxwellJsonKafkaSourcePostProcessor(TypedProperties props) {
super(props);
}

// ------------------------------------------------------------------------
// Partial fields in maxwell json string
// ------------------------------------------------------------------------

private static final String DATABASE = "database";
private static final String TABLE = "table";
private static final String DATA = "data";
private static final String OPERATION_TYPE = "type";
private static final String TS = "ts";

// ------------------------------------------------------------------------
// Operation types
// ------------------------------------------------------------------------

private static final String INSERT = "insert";
private static final String UPDATE = "update";
private static final String DELETE = "delete";

/**
* Configs to be passed for this processor.
*/
public static class Config {
public static final ConfigProperty<String> DATABASE_NAME_REGEX_PROP = ConfigProperty
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.database.regex")
.noDefaultValue()
.withDocumentation("Database name regex.");

public static final ConfigProperty<String> TABLE_NAME_REGEX_PROP = ConfigProperty
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.table.regex")
.noDefaultValue()
.withDocumentation("Table name regex.");

public static final ConfigProperty<String> PRECOMBINE_FIELD_TYPE_PROP = ConfigProperty
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.type")
.defaultValue("DATA_STRING")
.withDocumentation("Data type of the preCombine field. could be NON_TIMESTAMP, DATE_STRING,"
+ "UNIX_TIMESTAMP or EPOCHMILLISECONDS. DATA_STRING by default ");

public static final ConfigProperty<String> PRECOMBINE_FIELD_FORMAT_PROP = ConfigProperty
.key("hoodie.deltastreamer.source.json.kafka.post.processor.maxwell.precombine.field.format")
.defaultValue("yyyy-MM-dd HH:mm:ss")
.withDocumentation("When the preCombine filed is in DATE_STRING format, use should tell hoodie"
+ "what format it is. 'yyyy-MM-dd HH:mm:ss' by default");
}

@Override
public JavaRDD<String> process(JavaRDD<String> maxwellJsonRecords) {
return maxwellJsonRecords.map(record -> {
JsonNode inputJson = MAPPER.readTree(record);
String database = inputJson.get(DATABASE).textValue();
String table = inputJson.get(TABLE).textValue();

// filter out target databases and tables
if (isTargetTable(database, table)) {

LOG.info(String.format("Maxwell source processor starts process table : %s.%s", database, table));

ObjectNode result = (ObjectNode) inputJson.get(DATA);
String type = inputJson.get(OPERATION_TYPE).textValue();

// insert or update
if (INSERT.equals(type) || UPDATE.equals(type)) {
// tag this record not delete.
result.put(HoodieRecord.HOODIE_IS_DELETED, false);
return result.toString();

// delete
} else if (DELETE.equals(type)) {
return processDelete(inputJson, result);
} else {
// there might be some ddl data, ignore it
return null;
}
} else {
// not the data from target table(s), ignore it
return null;
}
}).filter(Objects::nonNull);
}

private String processDelete(JsonNode inputJson, ObjectNode result) {
// tag this record as delete.
result.put(HoodieRecord.HOODIE_IS_DELETED, true);

PreCombineFieldType preCombineFieldType =
valueOf(this.props.getString(Config.PRECOMBINE_FIELD_TYPE_PROP.key(),
Config.PRECOMBINE_FIELD_TYPE_PROP.defaultValue()).toUpperCase(Locale.ROOT));

// maxwell won't update the `update_time`(delete time) field of the record which is tagged as delete. so if we
// want to delete this record correctly, we should update its `update_time` to a time closer to where the
// delete operation actually occurred. here we use `ts` from maxwell json string as this 'delete' time.

// we can update the `update_time`(delete time) only when it is in timestamp format.
if (!preCombineFieldType.equals(NON_TIMESTAMP)) {
String preCombineField = this.props.getString(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(),
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.defaultValue());

// ts from maxwell
long ts = inputJson.get(TS).longValue();

// convert the `update_time`(delete time) to the proper format.
if (preCombineFieldType.equals(DATE_STRING)) {
// DATE_STRING format
String timeFormat = this.props.getString(Config.PRECOMBINE_FIELD_FORMAT_PROP.key(), Config.PRECOMBINE_FIELD_FORMAT_PROP.defaultValue());
result.put(preCombineField, DateTimeUtils.formatUnixTimestamp(ts, timeFormat));
} else if (preCombineFieldType.equals(EPOCHMILLISECONDS)) {
// EPOCHMILLISECONDS format
result.put(preCombineField, ts * 1000L);
} else if (preCombineFieldType.equals(UNIX_TIMESTAMP)) {
// UNIX_TIMESTAMP format
result.put(preCombineField, ts);
} else {
throw new HoodieSourcePostProcessException("Unsupported preCombine time format " + preCombineFieldType);
}
}
return result.toString();
}

/**
* Check if it is the right table we want to consume from.
*
* @param database database the data belong to
* @param table table the data belong to
*/
private boolean isTargetTable(String database, String table) {
String databaseRegex = this.props.getString(Config.DATABASE_NAME_REGEX_PROP.key());
String tableRegex = this.props.getString(Config.TABLE_NAME_REGEX_PROP.key());
return Pattern.matches(databaseRegex, database) && Pattern.matches(tableRegex, table);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources.processor.maxwell;

/**
* Enum of preCombine field time type.
*/
public enum PreCombineFieldType {
/**
* Not a timestamp type field
*/
NON_TIMESTAMP,

/**
* Timestamp type field in string format.
*/
DATE_STRING,

/**
* Timestamp type field in UNIX_TIMESTAMP format.
*/
UNIX_TIMESTAMP,

/**
* Timestamp type field in EPOCHMILLISECONDS format.
*/
EPOCHMILLISECONDS
}
Loading