Skip to content

Commit 1462dc5

Browse files
authored
branch-2.1: [bug](auto partition) Fix be crash with single replica insert (#48536)
### What problem does this PR solve? pick:#48101
1 parent 4dbf3eb commit 1462dc5

File tree

9 files changed

+375
-10
lines changed

9 files changed

+375
-10
lines changed

be/src/runtime/tablets_channel.cpp

+9-5
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,6 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq
357357
// 5. commit all writers
358358

359359
for (auto* writer : need_wait_writers) {
360-
PSlaveTabletNodes slave_nodes;
361-
362360
// close may return failed, but no need to handle it here.
363361
// tablet_vec will only contains success tablet, and then let FE judge it.
364362
_commit_txn(writer, req, res);
@@ -395,9 +393,15 @@ Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockReq
395393

396394
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
397395
PTabletWriterAddBlockResult* res) {
398-
Status st = writer->commit_txn(_write_single_replica
399-
? req.slave_tablet_nodes().at(writer->tablet_id())
400-
: PSlaveTabletNodes {});
396+
PSlaveTabletNodes slave_nodes;
397+
if (_write_single_replica) {
398+
auto& nodes_map = req.slave_tablet_nodes();
399+
auto it = nodes_map.find(writer->tablet_id());
400+
if (it != nodes_map.end()) {
401+
slave_nodes = it->second;
402+
}
403+
}
404+
Status st = writer->commit_txn(slave_nodes);
401405
if (st.ok()) [[likely]] {
402406
auto* tablet_vec = res->mutable_tablet_vec();
403407
PTabletInfo* tablet_info = tablet_vec->Add();

be/src/vec/sink/vrow_distribution.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ Status VRowDistribution::automatic_create_partition() {
9494
request.__set_db_id(_vpartition->db_id());
9595
request.__set_table_id(_vpartition->table_id());
9696
request.__set_partitionValues(_partitions_need_create);
97+
request.__set_write_single_replica(_write_single_replica);
9798

9899
VLOG_NOTICE << "automatic partition rpc begin request " << request;
99100
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
@@ -127,6 +128,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg
127128
result.nodes = std::move(arg.nodes);
128129
result.partitions = std::move(arg.partitions);
129130
result.tablets = std::move(arg.tablets);
131+
result.slave_tablets = std::move(arg.slave_tablets);
130132
return result;
131133
}
132134

@@ -138,6 +140,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
138140
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
139141
request.__set_db_id(_vpartition->db_id());
140142
request.__set_table_id(_vpartition->table_id());
143+
request.__set_write_single_replica(_write_single_replica);
141144

142145
// only request for partitions not recorded for replacement
143146
std::set<int64_t> id_deduper;

be/src/vec/sink/vrow_distribution.h

+3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ class VRowDistribution {
7979
const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
8080
std::shared_ptr<OlapTableSchemaParam> schema;
8181
void* caller = nullptr;
82+
bool write_single_replica = false;
8283
CreatePartitionCallback create_partition_callback;
8384
};
8485
friend class VTabletWriter;
@@ -100,6 +101,7 @@ class VRowDistribution {
100101
_vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
101102
_schema = ctx.schema;
102103
_caller = ctx.caller;
104+
_write_single_replica = ctx.write_single_replica;
103105
_create_partition_callback = ctx.create_partition_callback;
104106
}
105107

@@ -219,6 +221,7 @@ class VRowDistribution {
219221
CreatePartitionCallback _create_partition_callback = nullptr;
220222
void* _caller = nullptr;
221223
std::shared_ptr<OlapTableSchemaParam> _schema;
224+
bool _write_single_replica = false;
222225

223226
// reuse for find_tablet. save partitions found by find_tablets
224227
std::vector<VOlapTablePartition*> _partitions;

be/src/vec/sink/writer/vtablet_writer.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
125125
_has_inc_node = true;
126126
}
127127
LOG(INFO) << "init new node for instance " << _parent->_sender_id
128-
<< ", incremantal:" << incremental;
128+
<< ", node id:" << replica_node_id << ", incremantal:" << incremental;
129129
} else {
130130
channel = it->second;
131131
}
@@ -973,7 +973,8 @@ void VNodeChannel::mark_close(bool hang_wait) {
973973
DCHECK(_pending_blocks.back().second->eos());
974974
_close_time_ms = UnixMillis();
975975
LOG(INFO) << channel_info()
976-
<< " mark closed, left pending batch size: " << _pending_blocks.size();
976+
<< " mark closed, left pending batch size: " << _pending_blocks.size()
977+
<< " hang_wait: " << hang_wait;
977978
}
978979

979980
_eos_is_produced = true;
@@ -1101,7 +1102,8 @@ Status VTabletWriter::on_partitions_created(TCreatePartitionResult* result) {
11011102
auto* new_locations = _pool->add(new std::vector<TTabletLocation>(result->tablets));
11021103
_location->add_locations(*new_locations);
11031104
if (_write_single_replica) {
1104-
_slave_location->add_locations(*new_locations);
1105+
auto* slave_locations = _pool->add(new std::vector<TTabletLocation>(result->slave_tablets));
1106+
_slave_location->add_locations(*slave_locations);
11051107
}
11061108

11071109
// update new node info
@@ -1129,6 +1131,7 @@ Status VTabletWriter::_init_row_distribution() {
11291131
.vec_output_expr_ctxs = &_vec_output_expr_ctxs,
11301132
.schema = _schema,
11311133
.caller = this,
1134+
.write_single_replica = _write_single_replica,
11321135
.create_partition_callback = &vectorized::on_partitions_created});
11331136

11341137
return _row_distribution.open(_output_row_desc);

fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java

+33-2
Original file line numberDiff line numberDiff line change
@@ -261,20 +261,23 @@
261261
import com.google.common.collect.Lists;
262262
import com.google.common.collect.Maps;
263263
import com.google.common.collect.Multimap;
264+
import com.google.common.collect.Sets;
264265
import org.apache.commons.collections.CollectionUtils;
265266
import org.apache.commons.lang3.StringUtils;
266267
import org.apache.logging.log4j.LogManager;
267268
import org.apache.logging.log4j.Logger;
268269
import org.apache.thrift.TException;
269270

270271
import java.io.IOException;
272+
import java.security.SecureRandom;
271273
import java.util.ArrayList;
272274
import java.util.Collection;
273275
import java.util.Collections;
274276
import java.util.HashMap;
275277
import java.util.HashSet;
276278
import java.util.List;
277279
import java.util.Map;
280+
import java.util.Random;
278281
import java.util.Set;
279282
import java.util.concurrent.ConcurrentHashMap;
280283
import java.util.concurrent.ExecutionException;
@@ -3692,6 +3695,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
36923695
// build partition & tablets
36933696
List<TOlapTablePartition> partitions = Lists.newArrayList();
36943697
List<TTabletLocation> tablets = Lists.newArrayList();
3698+
List<TTabletLocation> slaveTablets = new ArrayList<>();
36953699
for (String partitionName : addPartitionClauseMap.keySet()) {
36963700
Partition partition = table.getPartition(partitionName);
36973701
TOlapTablePartition tPartition = new TOlapTablePartition();
@@ -3724,12 +3728,25 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t
37243728
if (bePathsMap.keySet().size() < quorum) {
37253729
LOG.warn("auto go quorum exception");
37263730
}
3727-
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
3731+
if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) {
3732+
Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
3733+
Random random = new SecureRandom();
3734+
Long masterNode = nodes[random.nextInt(nodes.length)];
3735+
Multimap<Long, Long> slaveBePathsMap = bePathsMap;
3736+
slaveBePathsMap.removeAll(masterNode);
3737+
tablets.add(new TTabletLocation(tablet.getId(),
3738+
Lists.newArrayList(Sets.newHashSet(masterNode))));
3739+
slaveTablets.add(new TTabletLocation(tablet.getId(),
3740+
Lists.newArrayList(slaveBePathsMap.keySet())));
3741+
} else {
3742+
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
3743+
}
37283744
}
37293745
}
37303746
}
37313747
result.setPartitions(partitions);
37323748
result.setTablets(tablets);
3749+
result.setSlaveTablets(slaveTablets);
37333750

37343751
// build nodes
37353752
List<TNodeInfo> nodeInfos = Lists.newArrayList();
@@ -3885,6 +3902,7 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
38853902
// so they won't be changed again. if other transaction changing it. just let it fail.
38863903
List<TOlapTablePartition> partitions = new ArrayList<>();
38873904
List<TTabletLocation> tablets = new ArrayList<>();
3905+
List<TTabletLocation> slaveTablets = new ArrayList<>();
38883906
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
38893907
for (long partitionId : resultPartitionIds) {
38903908
Partition partition = olapTable.getPartition(partitionId);
@@ -3920,12 +3938,25 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request
39203938
if (bePathsMap.keySet().size() < quorum) {
39213939
LOG.warn("auto go quorum exception");
39223940
}
3923-
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
3941+
if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) {
3942+
Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
3943+
Random random = new SecureRandom();
3944+
Long masterNode = nodes[random.nextInt(nodes.length)];
3945+
Multimap<Long, Long> slaveBePathsMap = bePathsMap;
3946+
slaveBePathsMap.removeAll(masterNode);
3947+
tablets.add(new TTabletLocation(tablet.getId(),
3948+
Lists.newArrayList(Sets.newHashSet(masterNode))));
3949+
slaveTablets.add(new TTabletLocation(tablet.getId(),
3950+
Lists.newArrayList(slaveBePathsMap.keySet())));
3951+
} else {
3952+
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
3953+
}
39243954
}
39253955
}
39263956
}
39273957
result.setPartitions(partitions);
39283958
result.setTablets(tablets);
3959+
result.setSlaveTablets(slaveTablets);
39293960

39303961
// build nodes
39313962
List<TNodeInfo> nodeInfos = Lists.newArrayList();

gensrc/thrift/FrontendService.thrift

+4
Original file line numberDiff line numberDiff line change
@@ -1483,13 +1483,15 @@ struct TCreatePartitionRequest {
14831483
3: optional i64 table_id
14841484
// for each partition column's partition values. [missing_rows, partition_keys]->Left bound(for range) or Point(for list)
14851485
4: optional list<list<Exprs.TNullableStringLiteral>> partitionValues
1486+
5: optional bool write_single_replica = false
14861487
}
14871488

14881489
struct TCreatePartitionResult {
14891490
1: optional Status.TStatus status
14901491
2: optional list<Descriptors.TOlapTablePartition> partitions
14911492
3: optional list<Descriptors.TTabletLocation> tablets
14921493
4: optional list<Descriptors.TNodeInfo> nodes
1494+
5: optional list<Descriptors.TTabletLocation> slave_tablets
14931495
}
14941496

14951497
// these two for auto detect replacing partition
@@ -1498,13 +1500,15 @@ struct TReplacePartitionRequest {
14981500
2: optional i64 db_id
14991501
3: optional i64 table_id
15001502
4: optional list<i64> partition_ids // partition to replace.
1503+
5: optional bool write_single_replica = false
15011504
}
15021505

15031506
struct TReplacePartitionResult {
15041507
1: optional Status.TStatus status
15051508
2: optional list<Descriptors.TOlapTablePartition> partitions
15061509
3: optional list<Descriptors.TTabletLocation> tablets
15071510
4: optional list<Descriptors.TNodeInfo> nodes
1511+
5: optional list<Descriptors.TTabletLocation> slave_tablets
15081512
}
15091513

15101514
struct TGetMetaReplica {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
-3590935922607536626,-,2025-02-16,星辰医疗科技有限公司
2+
-3590935906895636626,-,2025-02-16,未来健康产业
3+
123,-,2025-02-16,蓝海生物有限公司
4+
100000048812501,-,2025-02-16,阳光医疗集团
5+
1000000076784258,-,2025-02-16,华夏健康科技
6+
1000022060522735,-,2025-02-16,瑞丰生物医药
7+
1000022193719484,-,2025-02-16,盛世医疗服务
8+
1000031422678924,-,2025-02-16,康宁健康有限公司
9+
1000085651028900,-,2025-02-16,史前生物科技
10+
1000093620518989,-,2025-02-16,健康之路公司
11+
1000103471774704,-,2025-02-16,安康医疗科技
12+
1000138777615262,-,2025-02-16,福瑞堂生物
13+
1000156823071129,-,2025-02-16,优质生活科技
14+
1000191711015262,-,2025-02-16,健康未来企业
15+
1000633041475486,-,2025-02-16,天使医疗集团
16+
1000681518627336,-,2025-02-16,百年健康公司
17+
1002458253925730,-,2025-02-16,康乐园生物科技
18+
1008126191610424,-,2025-02-16,华康医药有限公司
19+
1071904784424147,-,2025-02-16,金桥健康产业
20+
1076564522324147,-,2025-02-16,乐活医疗科技
21+
1202217708798485,-,2025-02-16,健康家园公司
22+
1224474148903456,-,2025-02-16,康泰生物科技
23+
2043829367811999,-,2025-02-16,未来医疗集团
24+
2191851926844270,-,2025-02-16,健康之源公司
25+
2232379824950609,-,2025-02-16,安宁医药有限公司
26+
2350341369782152,-,2025-02-16,和谐生物科技
27+
2548383917911403,-,2025-02-16,康健医疗服务
28+
2640774381717600,-,2025-02-16,瑞康医药有限公司
29+
2754625269782961,-,2025-02-16,乐享健康产业
30+
3064667398063809,-,2025-02-16,健康先锋公司
31+
3102689636972458,-,2025-02-16,安康生物科技
32+
3291916164371209,-,2025-02-16,未来之星医疗
33+
3946909802002976,-,2025-02-16,健康梦想公司
34+
3965513055005942,-,2025-02-16,康乐生物科技
35+
4143117309325214,-,2025-02-16,安宁健康产业
36+
4175970196426577,-,2025-02-16,乐活医疗集团
37+
4294566787233969,-,2025-02-16,健康之道公司
38+
4610682351457207,-,2025-02-16,瑞丰医疗科技
39+
4674640812462217,-,2025-02-16,未来健康企业
40+
4676494238858307,-,2025-02-16,安康医药有限公司
41+
4937264996861701,-,2025-02-16,乐享健康公司
42+
4947288173569190,-,2025-02-16,康宁医疗集团
43+
5115179098054305,-,2025-02-16,健康家园科技
44+
10000000430024147,-,2025-02-16,阳光医疗有限公司
45+
10000021073673208,-,2025-02-16,未来生物科技
46+
10000130032642122,-,2025-02-16,和谐健康产业
47+
10000365660973707,-,2025-02-16,安宁医药公司
48+
10000453096993544,-,2025-02-16,傻乐生物科技
49+
10000789012345678,-,2025-02-16,星辉医疗科技
50+
10000890123456789,-,2025-02-16,未来健康服务
51+
10000901234567890,-,2025-02-16,蓝天生物科技
52+
10001012345678901,-,2025-02-16,阳光健康产业
53+
10001123456789012,-,2025-02-16,华康医疗集团
54+
10001234567890123,-,2025-02-16,易丰生物医药
55+
10001345678901234,-,2025-02-16,盛世健康科技
56+
10001456789012345,-,2025-02-16,康宁医疗服务
57+
10001567890123456,-,2025-02-16,和谐生物公司
58+
10001678901234567,-,2025-02-16,健康之路科技
59+
10001789012345678,-,2025-02-16,安康生物产业
60+
10001890123456789,-,2025-02-16,福瑞堂医疗
61+
10001901234567890,-,2025-02-16,未来生活科技
62+
10002012345678901,-,2025-02-16,美好未来企业
63+
10002123456789012,-,2025-02-16,金地医疗集团
64+
10002234567890123,-,2025-02-16,老头健康公司
65+
10002345678901234,-,2025-02-16,平安园生物科技
66+
10002456789012345,-,2025-02-16,闪电医药有限公司
67+
10002567890123456,-,2025-02-16,铜桥健康产业
68+
10002678901234567,-,2025-02-16,乐天医疗科技
69+
10002789012345678,-,2025-02-16,健康成长公司
70+
10002890123456789,-,2025-02-16,尖端生物科技
71+
10002901234567890,-,2025-02-16,保护伞医疗集团
72+
10003012345678901,-,2025-02-16,青春之源公司
73+
10003123456789012,-,2025-02-16,大森林医药有限公司
74+
10003234567890123,-,2025-02-16,毒蛇生物科技
75+
10003345678901234,-,2025-02-16,金地医疗服务
76+
10003456789012345,-,2025-02-16,瑞丰医药有限公司
77+
10003567890123456,-,2025-02-16,乐游娱乐产业
78+
10003678901234567,-,2025-02-16,康岩先锋公司

0 commit comments

Comments
 (0)