Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Doris Connector] Unified serialization method,Use RowToJsonConverter and TextSerializationSchema #7229

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.converter.TypeConverter;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -37,7 +35,7 @@ public static TypeConverter<BasicTypeDefine> getTypeConverter(@NonNull String do
|| dorisVersion.toLowerCase(Locale.ROOT).startsWith("selectdb-doris-2.")) {
return DorisTypeConverterV2.INSTANCE;
} else {
throw CommonError.unsupportedVersion(DorisConfig.IDENTIFIER, dorisVersion);
return DorisTypeConverterV2.INSTANCE;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@

package org.apache.seatunnel.connectors.doris.serialize;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkState;
import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;

public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements DorisSerializer {
public class SeaTunnelRowSerializer implements DorisSerializer {
String type;
private ObjectMapper objectMapper;
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
Expand All @@ -51,48 +52,29 @@ public SeaTunnelRowSerializer(
this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
if (JSON.equals(type)) {
objectMapper = new ObjectMapper();
}
}

@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
String valString;
if (JSON.equals(type)) {
valString = buildJsonString(seaTunnelRow);
} else if (CSV.equals(type)) {
valString = buildCSVString(seaTunnelRow);
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
return valString.getBytes(StandardCharsets.UTF_8);
public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {

JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
return jsonSerializationSchema.serialize(row);
}

public String buildJsonString(SeaTunnelRow row) throws IOException {
Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {

for (int i = 0; i < row.getFields().length; i++) {
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
rowMap.put(seaTunnelRowType.getFieldName(i), value);
}
if (enableDelete) {
rowMap.put(LoadConstants.DORIS_DELETE_SIGN, parseDeleteSign(row.getRowKind()));
}
return objectMapper.writeValueAsString(rowMap);
}
TextSerializationSchema build =
TextSerializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();

public String buildCSVString(SeaTunnelRow row) throws IOException {
StringJoiner joiner = new StringJoiner(fieldDelimiter);
for (int i = 0; i < row.getFields().length; i++) {
Object field = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
String value = field != null ? field.toString() : NULL_VALUE;
joiner.add(value);
}
if (enableDelete) {
joiner.add(parseDeleteSign(row.getRowKind()));
}
return joiner.toString();
return build.serialize(row);
}

public String parseDeleteSign(RowKind rowKind) {
Expand All @@ -105,46 +87,40 @@ public String parseDeleteSign(RowKind rowKind) {
}
}

public static Builder builder() {
return new Builder();
}

/** Builder for RowDataSerializer. */
public static class Builder {
private SeaTunnelRowType seaTunnelRowType;
private String type;
private String fieldDelimiter;
private boolean deletable;

public Builder setType(String type) {
this.type = type;
return this;
}
@Override
public void open() throws IOException {}

public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
return this;
}
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {

public Builder setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
return this;
}
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());

public Builder enableDelete(boolean deletable) {
this.deletable = deletable;
return this;
if (enableDelete) {
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
seaTunnelRowEnableDelete.setField(
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}

public SeaTunnelRowSerializer build() {
checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type));
return new SeaTunnelRowSerializer(type, seaTunnelRowType, fieldDelimiter, deletable);
if (JSON.equals(type)) {
return buildJsonString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
} else if (CSV.equals(type)) {
return buildCSVString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
}

@Override
public void open() throws IOException {}

@Override
public void close() throws IOException {}
}
Loading
Loading