Skip to content

Commit

Permalink
[ISSUE apache#4439]Support source connector jdbc handle CDC for mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Sep 24, 2023
1 parent f8d4023 commit 305524a
Show file tree
Hide file tree
Showing 32 changed files with 2,010 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.utils;

import org.slf4j.Logger;
import org.slf4j.Marker;

import lombok.experimental.UtilityClass;

@UtilityClass
public final class LogUtils {

public static boolean isTraceEnabled(Logger logger) {
return logger.isTraceEnabled();
}

public static void trace(Logger logger, String msg) {
if (isTraceEnabled(logger)) {
logger.trace(msg);
}
}

public static void trace(Logger logger, String format, Object arg) {
if (isTraceEnabled(logger)) {
logger.trace(format, arg);
}
}

public static void trace(Logger logger, String format, Object arg1, Object arg2) {
if (isTraceEnabled(logger)) {
logger.trace(format, arg1, arg2);
}
}

public static void trace(Logger logger, String format, Object... arguments) {
if (isTraceEnabled(logger)) {
logger.trace(format, arguments);
}
}

public static void trace(Logger logger, String msg, Throwable t) {
if (isTraceEnabled(logger)) {
logger.trace(msg, t);
}
}

public static boolean isTraceEnabled(Logger logger, Marker marker) {
return logger.isTraceEnabled(marker);
}

public static void trace(Logger logger, Marker marker, String msg) {
if (isTraceEnabled(logger, marker)) {
logger.trace(marker, msg);
}
}

public static void trace(Logger logger, Marker marker, String format, Object arg) {
if (isTraceEnabled(logger, marker)) {
logger.trace(marker, format, arg);
}
}

public static boolean isDebugEnabled(Logger logger) {
return logger.isDebugEnabled();
}

public static void debug(Logger logger, String msg) {
if (isDebugEnabled(logger)) {
logger.debug(msg);
}
}

public static void debug(Logger logger, String format, Object arg) {
if (isDebugEnabled(logger)) {
logger.debug(format, arg);
}
}

public static void debug(Logger logger, String format, Object arg1, Object arg2) {
if (isDebugEnabled(logger)) {
logger.debug(format, arg1, arg2);
}
}

public static void debug(Logger logger, String format, Object... arguments) {
if (isDebugEnabled(logger)) {
logger.debug(format, arguments);
}
}

public static void debug(Logger logger, String msg, Throwable t) {
if (isDebugEnabled(logger)) {
logger.debug(msg, t);
}
}

public static boolean isInfoEnabled(Logger logger) {
return logger.isInfoEnabled();
}

public static void info(Logger logger, String msg) {
if (isInfoEnabled(logger)) {
logger.info(msg);
}
}

public static void info(Logger logger, String format, Object arg) {
if (isInfoEnabled(logger)) {
logger.info(format, arg);
}
}

public static void info(Logger logger, String format, Object arg1, Object arg2) {
if (isInfoEnabled(logger)) {
logger.info(format, arg1, arg2);
}
}

public static void info(Logger logger, String format, Object... arguments) {
if (isInfoEnabled(logger)) {
logger.info(format, arguments);
}
}

public static void info(Logger logger, String msg, Throwable t) {
if (isInfoEnabled(logger)) {
logger.info(msg, t);
}
}

public static boolean isWarnEnabled(Logger logger) {
return logger.isWarnEnabled();
}

public static void warn(Logger logger, String msg) {
if (isWarnEnabled(logger)) {
logger.warn(msg);
}
}

public static void warn(Logger logger, String format, Object arg) {
if (isWarnEnabled(logger)) {
logger.warn(format, arg);
}
}

public static void warn(Logger logger, String format, Object arg1, Object arg2) {
if (isWarnEnabled(logger)) {
logger.warn(format, arg1, arg2);
}
}

public static void warn(Logger logger, String format, Object... arguments) {
if (isWarnEnabled(logger)) {
logger.warn(format, arguments);
}
}

public static void warn(Logger logger, String msg, Throwable t) {
if (isWarnEnabled(logger)) {
logger.warn(msg, t);
}
}

public static boolean isErrorEnabled(Logger logger) {
return logger.isErrorEnabled();
}

public static void error(Logger logger, String msg) {
if (isErrorEnabled(logger)) {
logger.error(msg);
}
}

public static void error(Logger logger, String format, Object arg) {
if (isErrorEnabled(logger)) {
logger.error(format, arg);
}
}

public static void error(Logger logger, String format, Object arg1, Object arg2) {
if (isErrorEnabled(logger)) {
logger.error(format, arg1, arg2);
}
}

public static void error(Logger logger, String format, Object... arguments) {
if (isErrorEnabled(logger)) {
logger.error(format, arguments);
}
}

public static void error(Logger logger, String msg, Throwable t) {
if (isErrorEnabled(logger)) {
logger.error(msg, t);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.jdbc;

import lombok.Data;

@Data
public class DataChanges {

private Object after;

private Object before;

private String type;

public DataChanges(Object after, Object before) {
this.after = after;
this.before = before;
}

public DataChanges(Object after, Object before, String type) {
this.after = after;
this.before = before;
this.type = type;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {

private String type;
private Object after;
private Object before;

public Builder withType(String type) {
this.type = type;
return this;
}

public Builder withAfter(Object after) {
this.after = after;
return this;
}

public Builder withBefore(Object before) {
this.before = before;
return this;
}

public DataChanges build() {
return new DataChanges(after, before, type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,23 @@

public final class JdbcConnectData {

public static final byte DATA_CHANGES = 1;

public static final byte SCHEMA_CHANGES = 1 << 1;

private Payload payload = new Payload();

private Schema schema;

private byte type;

public JdbcConnectData() {
}

public JdbcConnectData(byte type) {
this.type = type;
}

public Payload getPayload() {
return payload;
}
Expand All @@ -38,4 +51,20 @@ public Schema getSchema() {
public void setSchema(Schema schema) {
this.schema = schema;
}

public byte getType() {
return type;
}

public void setType(byte type) {
this.type = type;
}

public void markDataChanges() {
this.type |= DATA_CHANGES;
}

public void markSchemaChanges() {
this.type |= SCHEMA_CHANGES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@

public final class Payload extends HashMap<String, Object> {

public static final String AFTER_FIELD = "after";

public static final String BEFORE_FIELD = "before";

public static final String SOURCE = "source";

public static final String DDL = "ddl";


/**
* Constructs an empty <code>HashMap</code> with the default initial capacity (16) and the default load factor (0.75).
*/
Expand All @@ -31,12 +40,12 @@ public Payload() {
}

public Payload withSource(SourceMateData source) {
this.put("source", source);
this.put(SOURCE, source);
return this;
}

public Payload withDdl(String ddl) {
this.put("ddl", ddl);
this.put(DDL, ddl);
return this;
}

Expand All @@ -45,13 +54,13 @@ public Payload withCatalogChanges(CatalogChanges catalogChanges) {
return this;
}

public Payload withAfterValue(Object value) {
this.put("after", value);
public Payload withDataChanges(DataChanges dataChanges) {
this.put("dataChanges", dataChanges);
return this;
}

public SourceMateData ofSourceMateData() {
return (SourceMateData) super.get("source");
return (SourceMateData) super.get(SOURCE);
}

public static Builder builder() {
Expand All @@ -72,7 +81,7 @@ public Builder put(String key, Object value) {
}

public Builder withSource(SourceMateData source) {
payload.put("source", source);
payload.put(SOURCE, source);
return this;
}

Expand Down
Loading

0 comments on commit 305524a

Please sign in to comment.