Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
* Base class that provides support for seamlessly applying changes captured via Debezium.
* <p>
* Debezium change event types are determined for the op field in the payload
* <p>
* - For inserts, op=i
* - For deletes, op=d
* - For updates, op=u
* - For snapshort inserts, op=r
* <p>
* This payload implementation will issue matching insert, delete, updates against the hudi table
*/
public abstract class AbstractDebeziumAvroPayload extends OverwriteWithLatestAvroPayload {

private static final Logger LOG = LogManager.getLogger(AbstractDebeziumAvroPayload.class);

public AbstractDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public AbstractDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
IndexedRecord insertRecord = getInsertRecord(schema);
return handleDeleteOperation(insertRecord);
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
// Step 1: If the time occurrence of the current record in storage is higher than the time occurrence of the
// insert record (including a delete record), pick the current record.
if (shouldPickCurrentRecord(currentValue, getInsertRecord(schema), schema)) {
return Option.of(currentValue);
}
// Step 2: Pick the insert record (as a delete record if its a deleted event)
return getInsertValue(schema);
}

protected abstract boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException;

private Option<IndexedRecord> handleDeleteOperation(IndexedRecord insertRecord) {
boolean delete = false;
if (insertRecord instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertRecord;
Object value = record.get(DebeziumConstants.FLATTENED_OP_COL_NAME);
delete = value != null && value.toString().equalsIgnoreCase(DebeziumConstants.DELETE_OP);
}

return delete ? Option.empty() : Option.of(insertRecord);
}

private IndexedRecord getInsertRecord(Schema schema) throws IOException {
return super.getInsertValue(schema).get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model.debezium;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* Constants used by {@link DebeziumSource} and {@link DebeziumAvroPayload}.
*/
public class DebeziumConstants {

// INPUT COLUMNS
public static final String INCOMING_BEFORE_FIELD = "before";
public static final String INCOMING_AFTER_FIELD = "after";
public static final String INCOMING_SOURCE_FIELD = "source";
public static final String INCOMING_OP_FIELD = "op";
public static final String INCOMING_TS_MS_FIELD = "ts_ms";

public static final String INCOMING_SOURCE_NAME_FIELD = "source.name";
public static final String INCOMING_SOURCE_TS_MS_FIELD = "source.ts_ms";
public static final String INCOMING_SOURCE_TXID_FIELD = "source.txId";

// INPUT COLUMNS SPECIFIC TO MYSQL
public static final String INCOMING_SOURCE_FILE_FIELD = "source.file";
public static final String INCOMING_SOURCE_POS_FIELD = "source.pos";
public static final String INCOMING_SOURCE_ROW_FIELD = "source.row";

// INPUT COLUMNS SPECIFIC TO POSTGRES
public static final String INCOMING_SOURCE_LSN_FIELD = "source.lsn";
public static final String INCOMING_SOURCE_XMIN_FIELD = "source.xmin";

// OUTPUT COLUMNS
public static final String FLATTENED_OP_COL_NAME = "_change_operation_type";
public static final String UPSTREAM_PROCESSING_TS_COL_NAME = "_upstream_event_processed_ts_ms";
public static final String FLATTENED_SHARD_NAME = "db_shard_source_partition";
public static final String FLATTENED_TS_COL_NAME = "_event_origin_ts_ms";
public static final String FLATTENED_TX_ID_COL_NAME = "_event_tx_id";

// OUTPUT COLUMNS SPECIFIC TO MYSQL
public static final String FLATTENED_FILE_COL_NAME = "_event_bin_file";
public static final String FLATTENED_POS_COL_NAME = "_event_pos";
public static final String FLATTENED_ROW_COL_NAME = "_event_row";
public static final String ADDED_SEQ_COL_NAME = "_event_seq";

// OUTPUT COLUMNS SPECIFIC TO POSTGRES
public static final String FLATTENED_LSN_COL_NAME = "_event_lsn";
public static final String FLATTENED_XMIN_COL_NAME = "_event_xmin";

// Other Constants
public static final String DELETE_OP = "d";

// List of meta data columns
public static List<String> META_COLUMNS = Collections.unmodifiableList(Arrays.asList(
FLATTENED_OP_COL_NAME,
UPSTREAM_PROCESSING_TS_COL_NAME,
FLATTENED_TS_COL_NAME,
FLATTENED_TX_ID_COL_NAME,
FLATTENED_LSN_COL_NAME,
FLATTENED_XMIN_COL_NAME,
FLATTENED_SHARD_NAME
));
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;

/**
* Provides support for seamlessly applying changes captured via Debezium for MysqlDB.
* <p>
* Debezium change event types are determined for the op field in the payload
* <p>
* - For inserts, op=i
* - For deletes, op=d
* - For updates, op=u
* - For snapshort inserts, op=r
* <p>
* This payload implementation will issue matching insert, delete, updates against the hudi table
*/
public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload {

private static final Logger LOG = LogManager.getLogger(MySqlDebeziumAvroPayload.class);

public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public MySqlDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
}

private String extractSeq(IndexedRecord record) {
return ((CharSequence) ((GenericRecord) record).get(DebeziumConstants.ADDED_SEQ_COL_NAME)).toString();
}

@Override
protected boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException {
String currentSourceSeq = extractSeq(currentRecord);
String insertSourceSeq = extractSeq(insertRecord);
// Pick the current value in storage only if its Seq (file+pos) is latest
// compared to the Seq (file+pos) of the insert value
return insertSourceSeq.compareTo(currentSourceSeq) < 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.model.debezium;

import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* Provides support for seamlessly applying changes captured via Debezium for PostgresDB.
* <p>
* Debezium change event types are determined for the op field in the payload
* <p>
* - For inserts, op=i
* - For deletes, op=d
* - For updates, op=u
* - For snapshort inserts, op=r
* <p>
* This payload implementation will issue matching insert, delete, updates against the hudi table
*/
public class PostgresDebeziumAvroPayload extends AbstractDebeziumAvroPayload {

private static final Logger LOG = LogManager.getLogger(PostgresDebeziumAvroPayload.class);
public static final String DEBEZIUM_TOASTED_VALUE = "__debezium_unavailable_value";

public PostgresDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}

public PostgresDebeziumAvroPayload(Option<GenericRecord> record) {
super(record);
}

private Long extractLSN(IndexedRecord record) {
GenericRecord genericRecord = (GenericRecord) record;
return (Long) genericRecord.get(DebeziumConstants.FLATTENED_LSN_COL_NAME);
}

@Override
protected boolean shouldPickCurrentRecord(IndexedRecord currentRecord, IndexedRecord insertRecord, Schema schema) throws IOException {
Long currentSourceLSN = extractLSN(currentRecord);
Long insertSourceLSN = extractLSN(insertRecord);

// Pick the current value in storage only if its LSN is latest compared to the LSN of the insert value
return insertSourceLSN < currentSourceLSN;
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
// Specific to Postgres: If the updated record has TOASTED columns,
// we will need to keep the previous value for those columns
// see https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-toasted-values
Option<IndexedRecord> insertOrDeleteRecord = super.combineAndGetUpdateValue(currentValue, schema);

if (insertOrDeleteRecord.isPresent()) {
mergeToastedValuesIfPresent(insertOrDeleteRecord.get(), currentValue);
}
return insertOrDeleteRecord;
}

private void mergeToastedValuesIfPresent(IndexedRecord incomingRecord, IndexedRecord currentRecord) {
List<Schema.Field> fields = incomingRecord.getSchema().getFields();

fields.forEach(field -> {
// There are only four avro data types that have unconstrained sizes, which are
// NON-NULLABLE STRING, NULLABLE STRING, NON-NULLABLE BYTES, NULLABLE BYTES
if (((GenericData.Record) incomingRecord).get(field.name()) != null
&& (containsStringToastedValues(incomingRecord, field) || containsBytesToastedValues(incomingRecord, field))) {
((GenericData.Record) incomingRecord).put(field.name(), ((GenericData.Record) currentRecord).get(field.name()));
}
});
}

/**
* Returns true if a column is either of type string or a union of one or more strings that contain a debezium toasted value.
*
* @param incomingRecord The incoming avro record
* @param field the column of interest
* @return
*/
private boolean containsStringToastedValues(IndexedRecord incomingRecord, Schema.Field field) {
return ((field.schema().getType() == Schema.Type.STRING
|| (field.schema().getType() == Schema.Type.UNION && field.schema().getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.STRING)))
// Check length first as an optimization
&& ((CharSequence) ((GenericData.Record) incomingRecord).get(field.name())).length() == DEBEZIUM_TOASTED_VALUE.length()
&& DEBEZIUM_TOASTED_VALUE.equals(((CharSequence) ((GenericData.Record) incomingRecord).get(field.name())).toString()));
}

/**
* Returns true if a column is either of type bytes or a union of one or more bytes that contain a debezium toasted value.
*
* @param incomingRecord The incoming avro record
* @param field the column of interest
* @return
*/
private boolean containsBytesToastedValues(IndexedRecord incomingRecord, Schema.Field field) {
return ((field.schema().getType() == Schema.Type.BYTES
|| (field.schema().getType() == Schema.Type.UNION && field.schema().getTypes().stream().anyMatch(s -> s.getType() == Schema.Type.BYTES)))
// Check length first as an optimization
&& ((ByteBuffer) ((GenericData.Record) incomingRecord).get(field.name())).array().length == DEBEZIUM_TOASTED_VALUE.length()
&& DEBEZIUM_TOASTED_VALUE.equals(new String(((ByteBuffer) ((GenericData.Record) incomingRecord).get(field.name())).array(), StandardCharsets.UTF_8)));
}
}

Loading