diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index d719051013ba..c1ea9bb4fe1a 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -18,9 +18,9 @@
name: Backend
on:
push:
+ pull_request:
branches:
- dev
- pull_request:
paths-ignore:
- 'docs/**'
- '**/*.md'
@@ -32,7 +32,7 @@ concurrency:
jobs:
license-header:
- if: github.repository == 'apache/seatunnel'
+ if: github.repository == '${{github.actor}}/seatunnel'
name: License header
runs-on: ubuntu-latest
timeout-minutes: 10
@@ -44,7 +44,7 @@ jobs:
uses: apache/skywalking-eyes@985866ce7e324454f61e22eb2db2e998db09d6f3
code-style:
- if: github.repository == 'apache/seatunnel'
+ if: github.repository == '${{github.actor}}/seatunnel'
name: Code style
runs-on: ubuntu-latest
timeout-minutes: 10
@@ -56,7 +56,7 @@ jobs:
run: ./mvnw --batch-mode --quiet --no-snapshot-updates clean spotless:check
dead-link:
- if: github.repository == 'apache/seatunnel'
+ if: github.repository == '${{github.actor}}/seatunnel'
name: Dead links
runs-on: ubuntu-latest
timeout-minutes: 30
@@ -69,7 +69,7 @@ jobs:
done
sanity-check:
- if: github.repository == 'apache/seatunnel'
+ if: github.repository == '${{github.actor}}/seatunnel'
name: Sanity check results
needs: [ license-header, code-style, dead-link ]
runs-on: ubuntu-latest
@@ -83,8 +83,7 @@ jobs:
changes:
runs-on: ubuntu-latest
- # To prevent error when there's no base branch
- if: github.repository == 'apache/seatunnel'
+ if: github.repository == '${{github.actor}}/seatunnel'
timeout-minutes: 10
outputs:
api: ${{ steps.filter.outputs.api }}
diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md
index 4dbd3a84ce7f..c3bfa4a5dff5 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -32,7 +32,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
Kafka Topic.
-Currently two formats are supported:
+Currently, two formats are supported:
1. Fill in the name of the topic.
@@ -108,7 +108,7 @@ Kafka distinguishes different transactions by different transactionId. This para
### format
-Data format. The default format is json. Optional text format. The default field separator is ",".
+Data format. The default format is json. Optional text, avro format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.
### field_delimiter
diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md
index 06f60af6d879..a45b61f28581 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -73,7 +73,7 @@ The structure of the data, including field names and field types.
## format
-Data format. The default format is json. Optional text format. The default field separator is ", ".
+Data format. The default format is json. Optional text, avro format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.
## format_error_handle_way
diff --git a/seatunnel-connectors-v2/connector-kafka/pom.xml b/seatunnel-connectors-v2/connector-kafka/pom.xml
index 0ce4bba6b171..60dc30492898 100644
--- a/seatunnel-connectors-v2/connector-kafka/pom.xml
+++ b/seatunnel-connectors-v2/connector-kafka/pom.xml
@@ -61,6 +61,11 @@
seatunnel-format-compatible-debezium-json
${project.version}
+
+ org.apache.seatunnel
+ seatunnel-format-avro
+ ${project.version}
+
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index 65b5cc27699c..04d85c509b26 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -21,5 +21,6 @@ public enum MessageFormat {
JSON,
TEXT,
CANAL_JSON,
- COMPATIBLE_DEBEZIUM_JSON
+ COMPATIBLE_DEBEZIUM_JSON,
+ AVRO
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 06005de00353..9007e21e671f 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -24,6 +24,7 @@
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
+import org.apache.seatunnel.format.avro.AvroSerializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -221,6 +222,8 @@ private static SerializationSchema createSerializationSchema(
return new CanalJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
return new CompatibleDebeziumJsonSerializationSchema(rowType, isKey);
+ case AVRO:
+ return new AvroSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 450ba3f1cde7..61dd15890c68 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -43,7 +43,10 @@ public OptionRule optionRule() {
.conditional(
Config.FORMAT,
Arrays.asList(
- MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
+ MessageFormat.JSON,
+ MessageFormat.CANAL_JSON,
+ MessageFormat.TEXT,
+ MessageFormat.AVRO),
Config.TOPIC)
.optional(
Config.KAFKA_CONFIG,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 741d75216439..e4ebdd78f3ee 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -45,6 +45,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
+import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
@@ -266,6 +267,9 @@ private void setDeserialization(Config config) {
.setIgnoreParseErrors(true)
.build();
break;
+ case AVRO:
+ deserializationSchema = new AvroDeserializationSchema(typeInfo);
+ break;
default:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index 5d19a35a8740..2894b67dfbf4 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -220,7 +220,8 @@ The text of each license is the standard Apache 2.0 license.
(Apache License 2.0) aircompressor (io.airlift:aircompressor:0.10 - http://github.com/airlift/aircompressor)
(Apache License, Version 2.0) Apache Yetus - Audience Annotations (org.apache.yetus:audience-annotations:0.11.0 - https://yetus.apache.org/audience-annotations)
(The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.8.2 - http://avro.apache.org)
- (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
+ (The Apache Software License, Version 2.0) Apache Avro (org.apache.avro:avro:1.10.2 - http://avro.apache.org)
+ (Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
(Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 922798c3ded2..7e2bf30c93f9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -275,6 +275,28 @@ public void testSourceKafkaStartConfig(TestContainer container)
testKafkaGroupOffsetsToConsole(container);
}
+ @TestTemplate
+ public void testFakeSourceToKafkaAvroFormat(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ container.executeJob("/avro/fake_source_to_kafka_avro_format.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
+ @TestTemplate
+ public void testKafkaAvroToConsole(TestContainer container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ DefaultSeaTunnelRowSerializer.create(
+ "test_avro_topic",
+ SEATUNNEL_ROW_TYPE,
+ MessageFormat.AVRO,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(row -> serializer.serializeRow(row), 0, 100);
+ Container.ExecResult execResult = container.executeJob("/avro/kafka_avro_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+ }
+
public void testKafkaLatestToConsole(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
new file mode 100644
index 000000000000..c6f5d6944f6f
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/fake_source_to_kafka_avro_format.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ #spark config
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ c_row = {
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ format = avro
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
new file mode 100644
index 000000000000..3fc1a57c3d7c
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/avro/kafka_avro_to_console.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Kafka {
+ bootstrap.servers = "kafkaCluster:9092"
+ topic = "test_avro_topic"
+ result_table_name = "kafka_table"
+ kafka.auto.offset.reset = "earliest"
+ format = avro
+ format_error_handle_way = skip
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map"
+ c_array = "array"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ }
+}
+
+sink {
+ Console {
+ source_table_name = "kafka_table"
+ }
+ Assert {
+ source_table_name = "kafka_table"
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git a/seatunnel-formats/pom.xml b/seatunnel-formats/pom.xml
index 983a8629ce89..8320a84bbace 100644
--- a/seatunnel-formats/pom.xml
+++ b/seatunnel-formats/pom.xml
@@ -30,6 +30,7 @@
seatunnel-format-json
seatunnel-format-text
seatunnel-format-compatible-debezium-json
+ seatunnel-format-avro
diff --git a/seatunnel-formats/seatunnel-format-avro/pom.xml b/seatunnel-formats/seatunnel-format-avro/pom.xml
new file mode 100644
index 000000000000..232af650c59f
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ 4.0.0
+
+ org.apache.seatunnel
+ seatunnel-formats
+ ${revision}
+
+
+ seatunnel-format-avro
+ SeaTunnel : Formats : Avro
+
+
+ 1.10.2
+
+
+
+
+ org.apache.seatunnel
+ seatunnel-api
+ ${project.version}
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+
+
+
+
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
new file mode 100644
index 000000000000..3d2c3fba5956
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroDeserializationSchema.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+
+import java.io.IOException;
+
+public class AvroDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = -7907358485475741366L;
+
+ private final SeaTunnelRowType rowType;
+ private final AvroToRowConverter converter;
+
+ public AvroDeserializationSchema(SeaTunnelRowType rowType) {
+ this.rowType = rowType;
+ this.converter = new AvroToRowConverter();
+ }
+
+ @Override
+ public SeaTunnelRow deserialize(byte[] message) throws IOException {
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(message, null);
+ GenericRecord record = this.converter.getReader().read(null, decoder);
+ return converter.converter(record, rowType);
+ }
+
+ @Override
+ public SeaTunnelDataType getProducedType() {
+ return this.rowType;
+ }
+}
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
new file mode 100644
index 000000000000..92d37bec6018
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroSerializationSchema.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AvroSerializationSchema implements SerializationSchema {
+
+ private static final long serialVersionUID = 4438784443025715370L;
+
+ private final ByteArrayOutputStream out;
+ private final BinaryEncoder encoder;
+ private final RowToAvroConverter converter;
+ private final DatumWriter writer;
+
+ public AvroSerializationSchema(SeaTunnelRowType rowType) {
+ this.out = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+ this.converter = new RowToAvroConverter(rowType);
+ this.writer = this.converter.getWriter();
+ }
+
+ @Override
+ public byte[] serialize(SeaTunnelRow element) {
+ GenericRecord record = converter.convertRowToGenericRecord(element);
+ try {
+ out.reset();
+ writer.write(record, encoder);
+ encoder.flush();
+ return out.toByteArray();
+ } catch (IOException e) {
+ throw new SeaTunnelAvroFormatException(
+ AvroFormatErrorCode.SERIALIZATION_ERROR, e.toString());
+ }
+ }
+}
diff --git a/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
new file mode 100644
index 000000000000..898793c2413a
--- /dev/null
+++ b/seatunnel-formats/seatunnel-format-avro/src/main/java/org/apache/seatunnel/format/avro/AvroToRowConverter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.avro;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.format.avro.exception.AvroFormatErrorCode;
+import org.apache.seatunnel.format.avro.exception.SeaTunnelAvroFormatException;
+
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class AvroToRowConverter implements Serializable {
+
+ private static final long serialVersionUID = 8177020083886379563L;
+
+ private DatumReader reader = null;
+
+ public AvroToRowConverter() {}
+
+ public DatumReader getReader() {
+ if (reader == null) {
+ reader = createReader();
+ }
+ return reader;
+ }
+
+ private DatumReader createReader() {
+ GenericDatumReader datumReader = new GenericDatumReader<>();
+ datumReader.getData().addLogicalTypeConversion(new Conversions.DecimalConversion());
+ datumReader.getData().addLogicalTypeConversion(new TimeConversions.DateConversion());
+ datumReader
+ .getData()
+ .addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+ return datumReader;
+ }
+
+ public SeaTunnelRow converter(GenericRecord record, SeaTunnelRowType rowType) {
+ String[] fieldNames = rowType.getFieldNames();
+
+ Object[] values = new Object[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (record.getSchema().getField(fieldNames[i]) == null) {
+ values[i] = null;
+ continue;
+ }
+ values[i] =
+ convertField(
+ rowType.getFieldType(i),
+ record.getSchema().getField(fieldNames[i]),
+ record.get(fieldNames[i]));
+ }
+ return new SeaTunnelRow(values);
+ }
+
+ private Object convertField(SeaTunnelDataType> dataType, Schema.Field field, Object val) {
+ switch (dataType.getSqlType()) {
+ case MAP:
+ case STRING:
+ case BOOLEAN:
+ case SMALLINT:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case NULL:
+ case BYTES:
+ return val;
+ case TINYINT:
+ Class> typeClass = dataType.getTypeClass();
+ if (typeClass == Byte.class) {
+ Integer integer = (Integer) val;
+ return integer.byteValue();
+ }
+ return val;
+ case ARRAY:
+ BasicType> basicType = ((ArrayType, ?>) dataType).getElementType();
+ List