@@ -340,10 +340,13 @@ apply(#{index := Index,
340340 {State , Reply , Effects }
341341 end
342342 end ;
343- apply (Meta , # checkout {spec = cancel , consumer_id = ConsumerId }, State0 ) ->
344- {State , Effects } = cancel_consumer (Meta , ConsumerId , State0 , [],
345- consumer_cancel ),
346- checkout (Meta , State0 , State , Effects );
343+ apply (#{index := Idx } = Meta ,
344+ # checkout {spec = cancel ,
345+ consumer_id = ConsumerId }, State0 ) ->
346+ {State1 , Effects1 } = cancel_consumer (Meta , ConsumerId , State0 , [],
347+ consumer_cancel ),
348+ {State , Reply , Effects } = checkout (Meta , State0 , State1 , Effects1 ),
349+ update_smallest_raft_index (Idx , Reply , State , Effects );
347350apply (Meta , # checkout {spec = Spec , meta = ConsumerMeta ,
348351 consumer_id = {_ , Pid } = ConsumerId },
349352 State0 ) ->
@@ -372,8 +375,8 @@ apply(#{index := Index}, #purge{},
372375 {State , _ , Effects } = evaluate_limit (Index , false , State0 ,
373376 State1 , Effects0 ),
374377 update_smallest_raft_index (Index , Reply , State , Effects );
375- apply (_Meta , # garbage_collection {}, State ) ->
376- { State , ok , [{aux , garbage_collection }]} ;
378+ apply (#{ index : = Idx } , # garbage_collection {}, State ) ->
379+ update_smallest_raft_index ( Idx , ok , State , [{aux , garbage_collection }]) ;
377380apply (#{system_time := Ts } = Meta , {down , Pid , noconnection },
378381 #? MODULE {consumers = Cons0 ,
379382 cfg = # cfg {consumer_strategy = single_active },
@@ -506,13 +509,14 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
506509 checkout (Meta , State0 , State , Effects );
507510apply (_ , {nodedown , _Node }, State ) ->
508511 {State , ok };
509- apply (Meta , # purge_nodes {nodes = Nodes }, State0 ) ->
512+ apply (#{ index : = Idx } = Meta , # purge_nodes {nodes = Nodes }, State0 ) ->
510513 {State , Effects } = lists :foldl (fun (Node , {S , E }) ->
511514 purge_node (Meta , Node , S , E )
512515 end , {State0 , []}, Nodes ),
513- {State , ok , Effects };
514- apply (Meta , # update_config {config = Conf }, State ) ->
515- checkout (Meta , State , update_config (Conf , State ), []);
516+ update_smallest_raft_index (Idx , ok , State , Effects );
517+ apply (#{index := Idx } = Meta , # update_config {config = Conf }, State0 ) ->
518+ {State , Reply , Effects } = checkout (Meta , State0 , update_config (Conf , State0 ), []),
519+ update_smallest_raft_index (Idx , Reply , State , Effects );
516520apply (_Meta , {machine_version , 0 , 1 }, V0State ) ->
517521 State = convert_v0_to_v1 (V0State ),
518522 {State , ok , []};
@@ -1750,25 +1754,26 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17501754 messages = Messages0 ,
17511755 consumers = Cons0 } = InitState ) ->
17521756 case priority_queue :out (SQ0 ) of
1753- {{value , ConsumerId }, SQ1 } ->
1757+ {{value , ConsumerId }, SQ1 }
1758+ when is_map_key (ConsumerId , Cons0 ) ->
17541759 case take_next_msg (InitState ) of
17551760 {ConsumerMsg , State0 } ->
17561761 % % there are consumers waiting to be serviced
17571762 % % process consumer checkout
1758- case maps :find (ConsumerId , Cons0 ) of
1759- { ok , # consumer {credit = 0 } } ->
1763+ case maps :get (ConsumerId , Cons0 ) of
1764+ # consumer {credit = 0 } ->
17601765 % % no credit but was still on queue
17611766 % % can happen when draining
17621767 % % recurse without consumer on queue
17631768 checkout_one (Meta , InitState #? MODULE {service_queue = SQ1 });
1764- { ok , # consumer {status = cancelled } } ->
1769+ # consumer {status = cancelled } ->
17651770 checkout_one (Meta , InitState #? MODULE {service_queue = SQ1 });
1766- { ok , # consumer {status = suspected_down } } ->
1771+ # consumer {status = suspected_down } ->
17671772 checkout_one (Meta , InitState #? MODULE {service_queue = SQ1 });
1768- { ok , # consumer {checked_out = Checked0 ,
1769- next_msg_id = Next ,
1770- credit = Credit ,
1771- delivery_count = DelCnt } = Con0 } ->
1773+ # consumer {checked_out = Checked0 ,
1774+ next_msg_id = Next ,
1775+ credit = Credit ,
1776+ delivery_count = DelCnt } = Con0 ->
17721777 Checked = maps :put (Next , ConsumerMsg , Checked0 ),
17731778 Con = Con0 # consumer {checked_out = Checked ,
17741779 next_msg_id = Next + 1 ,
@@ -1795,14 +1800,14 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
17951800 add_bytes_checkout (Header , State1 )),
17961801 M }
17971802 end ,
1798- {success , ConsumerId , Next , Msg , State };
1799- error ->
1800- % % consumer did not exist but was queued, recurse
1801- checkout_one (Meta , InitState #? MODULE {service_queue = SQ1 })
1803+ {success , ConsumerId , Next , Msg , State }
18021804 end ;
18031805 empty ->
18041806 {nochange , InitState }
18051807 end ;
1808+ {{value , _ConsumerId }, SQ1 } ->
1809+ % % consumer did not exist but was queued, recurse
1810+ checkout_one (Meta , InitState #? MODULE {service_queue = SQ1 });
18061811 {empty , _ } ->
18071812 case lqueue :len (Messages0 ) of
18081813 0 -> {nochange , InitState };
0 commit comments