diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a53372..2510fe30f 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .enableUpsertDelete(supportUpsertDelete()); if (hasColumnMappingProperty()) { - defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); + List columns = new ArrayList<>(Arrays.asList(streamLoadProps.get("columns").split(","))); + if (supportUpsertDelete()) { + // auto add `__op` for primary key table even when user specified `sink.properties.columns`. + // in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`. + if (columns.stream().noneMatch(it -> it.equals("__op"))) { + columns.add("__op"); + } + } + defaultTablePropertiesBuilder.columns(String.join(",", columns)); } else if (getTableSchemaFieldNames() != null) { // don't need to add "columns" header in following cases // 1. use csv format but the flink and starrocks schemas are aligned