diff --git a/src/osiris.erl b/src/osiris.erl index 8d95d043..25396c8e 100644 --- a/src/osiris.erl +++ b/src/osiris.erl @@ -40,6 +40,7 @@ reference => term(), event_formatter => {module(), atom(), list()}, retention => [osiris:retention_spec()], + features => features(), atom() => term()}. -type mfarg() :: {module(), atom(), list()}. @@ -77,6 +78,7 @@ -type data() :: iodata() | batch() | {filter_value(), iodata() | batch()}. +-type features() :: #{committed_offset_calculate => boolean()}. %% returned when reading -type entry() :: binary() | batch(). @@ -307,13 +309,16 @@ configure_logger(Module) -> persistent_term:put('$osiris_logger', Module). -spec get_stats(pid()) -> #{committed_chunk_id => integer(), - first_chunk_id => integer()}. + first_chunk_id => integer(), + last_chunk_id => integer(), + committed_offset => integer()}. get_stats(Pid) when node(Pid) =:= node() -> #{shared := Shared} = osiris_util:get_reader_context(Pid), #{committed_chunk_id => osiris_log_shared:committed_chunk_id(Shared), first_chunk_id => osiris_log_shared:first_chunk_id(Shared), - last_chunk_id => osiris_log_shared:last_chunk_id(Shared)}; + last_chunk_id => osiris_log_shared:last_chunk_id(Shared), + committed_offset => osiris_log_shared:committed_offset(Shared)}; get_stats(Pid) when is_pid(Pid) -> erpc:call(node(Pid), ?MODULE, ?FUNCTION_NAME, [Pid]). diff --git a/src/osiris.hrl b/src/osiris.hrl index ffb39c90..0fae757e 100644 --- a/src/osiris.hrl +++ b/src/osiris.hrl @@ -30,6 +30,11 @@ domain => [osiris]}), ok). +%% tail info pattern matching +-define(TAIL_INFO(ChunkId, Ts), {_, {_, ChunkId, Ts}}). +-define(TAIL_INFO(ChunkId), ?TAIL_INFO(ChunkId, _)). +-define(TAIL_INFO_NEXT(NextOffset, ChunkId), {NextOffset, {_, ChunkId, _}}). + -define(IS_STRING(S), is_list(S) orelse is_binary(S)). -define(C_NUM_LOG_FIELDS, 5). diff --git a/src/osiris_counters.erl b/src/osiris_counters.erl index 04975b94..65ae16e0 100644 --- a/src/osiris_counters.erl +++ b/src/osiris_counters.erl @@ -12,6 +12,7 @@ fetch/1, overview/0, overview/1, + counters/2, delete/1 ]). @@ -45,3 +46,7 @@ overview() -> -spec overview(name()) -> #{atom() => non_neg_integer()} | undefined. overview(Name) -> seshat:counters(osiris, Name). + +-spec counters(name(), [atom()]) -> #{atom() => non_neg_integer()} | undefined. +counters(Name, Fields) -> + seshat:counters(osiris, Name, Fields). diff --git a/src/osiris_log.erl b/src/osiris_log.erl index 9b598fb4..e7e137df 100644 --- a/src/osiris_log.erl +++ b/src/osiris_log.erl @@ -36,9 +36,10 @@ read_chunk/1, read_chunk_parsed/1, read_chunk_parsed/2, - committed_offset/1, committed_chunk_id/1, + committed_offset/1, set_committed_chunk_id/2, + set_committed_offset/2, last_chunk_id/1, get_current_epoch/1, get_directory/1, @@ -1353,10 +1354,6 @@ last_user_chunk_id_in_index(NextPos, IdxFd) -> Error end. --spec committed_offset(state()) -> integer(). -committed_offset(State) -> - committed_chunk_id(State). - -spec committed_chunk_id(state()) -> integer(). committed_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) -> osiris_log_shared:committed_chunk_id(Ref). @@ -1367,6 +1364,16 @@ set_committed_chunk_id(#?MODULE{mode = #write{}, when is_integer(ChunkId) -> osiris_log_shared:set_committed_chunk_id(Ref, ChunkId). +-spec set_committed_offset(state(), offset()) -> ok. +set_committed_offset(#?MODULE{mode = #write{}, + cfg = #cfg{shared = Ref}}, Offset) + when is_integer(Offset) -> + osiris_log_shared:set_committed_offset(Ref, Offset). + +-spec committed_offset(state()) -> integer(). +committed_offset(#?MODULE{cfg = #cfg{shared = Ref}}) -> + osiris_log_shared:committed_offset(Ref). + -spec last_chunk_id(state()) -> integer(). last_chunk_id(#?MODULE{cfg = #cfg{shared = Ref}}) -> osiris_log_shared:last_chunk_id(Ref). diff --git a/src/osiris_log_shared.erl b/src/osiris_log_shared.erl index d2986af8..d63c49dd 100644 --- a/src/osiris_log_shared.erl +++ b/src/osiris_log_shared.erl @@ -1,34 +1,39 @@ -module(osiris_log_shared). --define(COMMITTED_IDX, 1). +-define(COMMITTED_CHUNK_ID, 1). -define(FIRST_IDX, 2). -define(LAST_IDX, 3). +-define(COMMITTED_OFFSET, 4). -export([ new/0, committed_chunk_id/1, first_chunk_id/1, last_chunk_id/1, + committed_offset/1, set_committed_chunk_id/2, set_first_chunk_id/2, - set_last_chunk_id/2 + set_last_chunk_id/2, + set_committed_offset/2 ]). -type chunk_id() :: -1 | non_neg_integer(). +-type offset() :: -1 | non_neg_integer(). -spec new() -> atomics:atomics_ref(). new() -> %% Oh why, oh why did we think the first chunk id in %% a stream should have offset 0? - Ref = atomics:new(3, [{signed, true}]), - atomics:put(Ref, ?COMMITTED_IDX, -1), + Ref = atomics:new(4, [{signed, true}]), + atomics:put(Ref, ?COMMITTED_CHUNK_ID, -1), atomics:put(Ref, ?FIRST_IDX, -1), atomics:put(Ref, ?LAST_IDX, -1), + atomics:put(Ref, ?COMMITTED_OFFSET, -1), Ref. -spec committed_chunk_id(atomics:atomics_ref()) -> chunk_id(). committed_chunk_id(Ref) -> - atomics:get(Ref, ?COMMITTED_IDX). + atomics:get(Ref, ?COMMITTED_CHUNK_ID). -spec first_chunk_id(atomics:atomics_ref()) -> chunk_id(). first_chunk_id(Ref) -> @@ -38,9 +43,13 @@ first_chunk_id(Ref) -> last_chunk_id(Ref) -> atomics:get(Ref, ?LAST_IDX). +-spec committed_offset(atomics:atomics_ref()) -> chunk_id(). +committed_offset(Ref) -> + atomics:get(Ref, ?COMMITTED_OFFSET). + -spec set_committed_chunk_id(atomics:atomics_ref(), chunk_id()) -> ok. set_committed_chunk_id(Ref, Value) when is_integer(Value) -> - atomics:put(Ref, ?COMMITTED_IDX, Value). + atomics:put(Ref, ?COMMITTED_CHUNK_ID, Value). -spec set_first_chunk_id(atomics:atomics_ref(), chunk_id()) -> ok. set_first_chunk_id(Ref, Value) when is_integer(Value) -> @@ -50,6 +59,9 @@ set_first_chunk_id(Ref, Value) when is_integer(Value) -> set_last_chunk_id(Ref, Value) when is_integer(Value) -> atomics:put(Ref, ?LAST_IDX, Value). +-spec set_committed_offset(atomics:atomics_ref(), offset()) -> ok. +set_committed_offset(Ref, Value) when is_integer(Value) -> + atomics:put(Ref, ?COMMITTED_OFFSET, Value). -ifdef(TEST). @@ -60,12 +72,15 @@ basics_test() -> ?assertEqual(-1, committed_chunk_id(R)), ?assertEqual(-1, first_chunk_id(R)), ?assertEqual(-1, last_chunk_id(R)), + ?assertEqual(-1, committed_offset(R)), ok = set_committed_chunk_id(R, 2), + ok = set_committed_offset(R, 3), ok = set_first_chunk_id(R, 1), - ok = set_last_chunk_id(R, 3), + ok = set_last_chunk_id(R, 4), ?assertEqual(2, committed_chunk_id(R)), + ?assertEqual(3, committed_offset(R)), ?assertEqual(1, first_chunk_id(R)), - ?assertEqual(3, last_chunk_id(R)), + ?assertEqual(4, last_chunk_id(R)), ok. diff --git a/src/osiris_replica.erl b/src/osiris_replica.erl index 733356ae..5d73b2de 100644 --- a/src/osiris_replica.erl +++ b/src/osiris_replica.erl @@ -50,7 +50,8 @@ reference :: term(), event_formatter :: undefined | mfa(), counter :: counters:counters_ref(), - token :: undefined | binary()}). + token :: undefined | binary(), + committed_offset_calculate :: boolean()}). -type parse_state() :: undefined | @@ -75,12 +76,15 @@ -define(C_PACKETS, ?C_NUM_LOG_FIELDS + 3). -define(C_READERS, ?C_NUM_LOG_FIELDS + 4). -define(C_EPOCH, ?C_NUM_LOG_FIELDS + 5). +-define(C_COMMITTED_CHUNK_ID, ?C_NUM_LOG_FIELDS + 6). -define(ADD_COUNTER_FIELDS, [{committed_offset, ?C_COMMITTED_OFFSET, counter, "Last committed offset"}, {forced_gcs, ?C_FORCED_GCS, counter, "Number of garbage collection runs"}, {packets, ?C_PACKETS, counter, "Number of packets"}, {readers, ?C_READERS, counter, "Number of readers"}, - {epoch, ?C_EPOCH, counter, "Current epoch"}]). + {epoch, ?C_EPOCH, counter, "Current epoch"}, + {committed_chunk_id, ?C_COMMITTED_CHUNK_ID, counter, "Last committed chunk ID"} + ]). -define(FIELDSPEC_KEY, osiris_replica_seshat_fields_spec). -define(DEFAULT_ONE_TIME_TOKEN_TIMEOUT, 30000). @@ -194,10 +198,10 @@ handle_continue(#{name := Name0, case LastChunk of empty -> ok; - {_, LastChId, LastTs} -> + _ -> %% need to ack last chunk back to leader so that it can %% re-discover the committed offset - osiris_writer:ack(LeaderPid, {LastChId, LastTs}) + osiris_writer:ack(LeaderPid, ack_msg(Config, TailInfo)) end, ?INFO_(Name, "osiris replica starting in epoch ~b, next offset ~b, tail info ~w", [Epoch, NextOffset, TailInfo]), @@ -236,6 +240,10 @@ handle_continue(#{name := Name0, Acceptor = spawn_link(fun() -> accept(Name, Transport, LSock, Self) end), ?DEBUG_(Name, "starting replica reader on node '~w'", [Node]), + CmttedOfstCalculate = committed_offset_calculate(Config), + Features0 = maps:get(features, Config, #{}), + Features1 = Features0#{committed_offset_calculate => + committed_offset_calculate(Config)}, ReplicaReaderConf = #{hosts => IpsHosts, port => Port, transport => Transport, @@ -244,7 +252,8 @@ handle_continue(#{name := Name0, leader_pid => LeaderPid, start_offset => TailInfo, reference => ExtRef, - connection_token => Token}, + connection_token => Token, + features => Features1}, case osiris_replica_reader:start(Node, ReplicaReaderConf) of {ok, RRPid} -> true = link(RRPid), @@ -260,6 +269,7 @@ handle_continue(#{name := Name0, false -> infinity end, + counters:put(CntRef, ?C_COMMITTED_CHUNK_ID, -1), counters:put(CntRef, ?C_COMMITTED_OFFSET, -1), counters:put(CntRef, ?C_EPOCH, Epoch), Shared = osiris_log:get_shared(Log), @@ -281,7 +291,8 @@ handle_continue(#{name := Name0, event_formatter = EvtFmt, counter = CntRef, token = Token, - transport = Transport}, + transport = Transport, + committed_offset_calculate = CmttedOfstCalculate}, log = Log, parse_state = undefined}}; {error, {connection_refused = R, _}} -> @@ -424,7 +435,7 @@ handle_call(Unknown, _From, %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_cast({committed_offset, CommittedChId}, +handle_cast({committed_offset, {CommittedChId, LastOffset}}, #?MODULE{cfg = #cfg{counter = Cnt}, log = Log, committed_chunk_id = LastCommittedChId} = @@ -432,14 +443,18 @@ handle_cast({committed_offset, CommittedChId}, case CommittedChId > LastCommittedChId of true -> %% notify offset listeners - counters:put(Cnt, ?C_COMMITTED_OFFSET, CommittedChId), + counters:put(Cnt, ?C_COMMITTED_CHUNK_ID, CommittedChId), + counters:put(Cnt, ?C_COMMITTED_OFFSET, LastOffset), ok = osiris_log:set_committed_chunk_id(Log, CommittedChId), + ok = osiris_log:set_committed_offset(Log, LastOffset), {noreply, notify_offset_listeners( State#?MODULE{committed_chunk_id = CommittedChId})}; false -> State end; +handle_cast({committed_offset, CommittedChId}, State) -> + handle_cast({committed_offset, {CommittedChId, -1}}, State); handle_cast({register_offset_listener, Pid, EvtFormatter, Offset}, #?MODULE{cfg = #cfg{reference = Ref, event_formatter = DefaultFmt}, @@ -572,7 +587,7 @@ handle_incoming_data(Socket, Bin, #cfg{socket = Socket, leader_pid = LeaderPid, transport = Transport, - counter = Cnt}, + counter = Cnt} = Cfg, parse_state = ParseState0, log = Log0} = State0) -> @@ -594,7 +609,8 @@ handle_incoming_data(Socket, Bin, undefined -> {noreply, State1}; _ -> - ok = osiris_writer:ack(LeaderPid, OffsetTimestamp), + TailInfo = osiris_log:tail_info(Log), + ok = osiris_writer:ack(LeaderPid, ack_msg(Cfg, TailInfo)), State = notify_offset_listeners(State1), {noreply, State} end. @@ -738,7 +754,7 @@ notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref, State#?MODULE{offset_listeners = L}. max_readable_chunk_id(Log) -> - min(osiris_log:committed_offset(Log), osiris_log:last_chunk_id(Log)). + min(osiris_log:committed_chunk_id(Log), osiris_log:last_chunk_id(Log)). %% INTERNAL @@ -796,3 +812,20 @@ listen(ssl, Port, Options) -> init_fields_spec() -> persistent_term:put(?FIELDSPEC_KEY, ?ADD_COUNTER_FIELDS ++ osiris_log:counter_fields()). + +ack_msg(Cfg, TailInfo) -> + case committed_offset_calculate(Cfg) of + true -> + TailInfo; + false -> + ?TAIL_INFO(TailChkId, TailTs) = TailInfo, + {TailChkId, TailTs} + end. + +committed_offset_calculate(#cfg{committed_offset_calculate = On}) -> + On; +committed_offset_calculate(#{features := #{committed_offset_calculate := On}}) + when is_boolean(On) -> + On; +committed_offset_calculate(_) -> + false. diff --git a/src/osiris_replica_reader.erl b/src/osiris_replica_reader.erl index ac195e38..c0415e78 100644 --- a/src/osiris_replica_reader.erl +++ b/src/osiris_replica_reader.erl @@ -41,7 +41,8 @@ counter :: counters:counters_ref(), counter_id :: term(), committed_offset = -1 :: -1 | osiris:offset(), - offset_listener :: undefined | osiris:offset()}). + offset_listener :: undefined | osiris:offset(), + committed_offset_calculate :: boolean()}). -define(C_OFFSET_LISTENERS, ?C_NUM_LOG_FIELDS + 1). -define(COUNTER_FIELDS, @@ -114,7 +115,7 @@ init(#{hosts := Hosts, leader_pid := LeaderPid, start_offset := {StartOffset, _} = TailInfo, reference := ExtRef, - connection_token := Token}) -> + connection_token := Token} = Config) -> process_flag(trap_exit, true), ?DEBUG("~ts: trying to connect to replica at ~0p", [Name, Hosts]), @@ -125,30 +126,32 @@ init(#{hosts := Hosts, [Host, Port]), CntId = {?MODULE, ExtRef, Host, Port}, CntSpec = {CntId, {persistent_term, ?FIELDSPEC_KEY}}, - Config = #{counter_spec => CntSpec, transport => Transport}, + ReadCfg = #{counter_spec => CntSpec, transport => Transport}, %% send token to replica to complete connection setup ok = send(Transport, Sock, Token), - Ret = osiris_writer:init_data_reader(LeaderPid, TailInfo, Config), + Ret = osiris_writer:init_data_reader(LeaderPid, TailInfo, ReadCfg), case Ret of {ok, Log} -> CntRef = osiris_log:counters_ref(Log), ?INFO_(Name, "starting osiris replica reader at offset ~b", [osiris_log:next_offset(Log)]), + CmttedOfstCalculate = committed_offset_calculate(Config), %% register data listener with osiris_proc ok = osiris_writer:register_data_listener(LeaderPid, StartOffset), MRef = monitor(process, LeaderPid), State = maybe_register_offset_listener( - maybe_send_committed_offset(#state{log = Log, - name = Name, - transport = Transport, - socket = Sock, - replica_pid = ReplicaPid, - leader_pid = LeaderPid, - leader_monitor_ref = MRef, - counter = CntRef, - counter_id = CntId})), + maybe_send_committed_chunk_id(#state{log = Log, + name = Name, + transport = Transport, + socket = Sock, + replica_pid = ReplicaPid, + leader_pid = LeaderPid, + leader_monitor_ref = MRef, + counter = CntRef, + counter_id = CntId, + committed_offset_calculate = CmttedOfstCalculate})), {ok, State}; {error, no_process} -> ?WARN_(Name, @@ -242,7 +245,7 @@ maybe_register_offset_listener(State) -> %% @end %%-------------------------------------------------------------------- handle_info({osiris_offset, _, _Offs}, State0) -> - State1 = maybe_send_committed_offset(State0), + State1 = maybe_send_committed_chunk_id(State0), State = maybe_register_offset_listener(State1#state{offset_listener = undefined}), @@ -333,7 +336,7 @@ do_sendfile0(#state{name = Name, socket = Sock, transport = Transport, log = Log0} = State0) -> - State = maybe_send_committed_offset(State0), + State = maybe_send_committed_chunk_id(State0), case osiris_log:send_file(Sock, Log0) of {ok, Log} -> do_sendfile0(State#state{log = Log}); @@ -348,16 +351,23 @@ do_sendfile0(#state{name = Name, State#state{log = Log} end. -maybe_send_committed_offset(#state{log = Log, - committed_offset = Last, - replica_pid = RPid} = - State) -> - COffs = osiris_log:committed_offset(Log), +maybe_send_committed_chunk_id(#state{log = Log, + committed_offset = Last, + replica_pid = RPid} = + State) -> + COffs = osiris_log:committed_chunk_id(Log), case COffs of COffs when COffs > Last -> - ok = - erlang:send(RPid, {'$gen_cast', {committed_offset, COffs}}, - [noconnect, nosuspend]), + LastOffset = osiris_log:committed_offset(Log), + Msg = case committed_offset_calculate(State) of + true -> + {COffs, LastOffset}; + false -> + COffs + end, + + ok = erlang:send(RPid, {'$gen_cast', {committed_offset, Msg}}, + [noconnect, nosuspend]), State#state{committed_offset = COffs}; _ -> State @@ -438,3 +448,11 @@ maybe_add_sni_option(_) -> init_fields_spec() -> persistent_term:put(?FIELDSPEC_KEY, ?COUNTER_FIELDS ++ osiris_log:counter_fields()). + +committed_offset_calculate(#state{committed_offset_calculate = On}) -> + On; +committed_offset_calculate(#{features := #{committed_offset_calculate := On}}) + when is_boolean(On) -> + On; +committed_offset_calculate(_) -> + false. diff --git a/src/osiris_writer.erl b/src/osiris_writer.erl index 6e9a42b5..8041ffda 100644 --- a/src/osiris_writer.erl +++ b/src/osiris_writer.erl @@ -43,11 +43,13 @@ -define(C_COMMITTED_OFFSET, ?C_NUM_LOG_FIELDS + 1). -define(C_READERS, ?C_NUM_LOG_FIELDS + 2). -define(C_EPOCH, ?C_NUM_LOG_FIELDS + 3). +-define(C_COMMITTED_CHUNK_ID, ?C_NUM_LOG_FIELDS + 4). -define(ADD_COUNTER_FIELDS, [ {committed_offset, ?C_COMMITTED_OFFSET, counter, "Last committed offset"}, {readers, ?C_READERS, counter, "Number of readers"}, - {epoch, ?C_EPOCH, counter, "Current epoch"} + {epoch, ?C_EPOCH, counter, "Current epoch"}, + {committed_chunk_id, ?C_COMMITTED_CHUNK_ID, counter, "Last committed chunk ID"} ] ). -define(FIELDSPEC_KEY, osiris_writer_seshat_fields_spec). @@ -68,14 +70,14 @@ {cfg :: #cfg{}, log :: osiris_log:state(), tracking :: osiris_tracking:state(), - replica_state = #{} :: #{node() => {osiris:offset(), osiris:timestamp()}}, + replica_state = #{} :: #{node() => osiris:tail_info()}, pending_corrs = queue:new() :: queue:queue(), duplicates = [] :: [{osiris:offset(), pid(), osiris:writer_id(), non_neg_integer()}], data_listeners = [] :: [{pid(), osiris:offset()}], offset_listeners = [] :: [{pid(), osiris:offset(), mfa() | undefined}], - committed_offset = -1 :: osiris:offset()}). + committed_chunk_id = -1 :: osiris:offset()}). -opaque state() :: #?MODULE{}. @@ -142,10 +144,14 @@ register_data_listener(Pid, Offset) -> ok = gen_batch_server:cast(Pid, {register_data_listener, self(), Offset}). --spec ack(identifier(), {osiris:offset(), osiris:timestamp()}) -> ok. +-spec ack(identifier(), {osiris:offset(), osiris:timestamp()} | + osiris:tail_info()) -> ok. ack(LeaderPid, {Offset, _} = OffsetTs) when is_integer(Offset) andalso Offset >= 0 -> - gen_batch_server:cast(LeaderPid, {ack, node(), OffsetTs}). + gen_batch_server:cast(LeaderPid, {ack, node(), OffsetTs}); +ack(LeaderPid, ?TAIL_INFO(ChkId) = TailInfo) + when is_integer(ChkId) andalso ChkId >= 0 -> + gen_batch_server:cast(LeaderPid, {ack, node(), TailInfo}). -spec write(Pid :: pid(), Data :: osiris:data()) -> ok. write(Pid, Data) @@ -223,25 +229,26 @@ handle_continue(#{name := Name, counters:add(CntRef, ?C_READERS, Inc) end), Trk = osiris_log:recover_tracking(Log), - %% should this not be be chunk id rather than last offset? - LastOffs = osiris_log:next_offset(Log) - 1, - CommittedOffset = + {LastChunkId, NextOffset} = case osiris_log:tail_info(Log) of - {_, {_, TailChId, _}} when Replicas == [] -> + ?TAIL_INFO_NEXT(TailNextOffs, TailChId) when Replicas == [] -> %% only when there are no replicas can we %% recover the committed offset from the last %% batch offset in the log - TailChId; + {TailChId, TailNextOffs}; _ -> - -1 + {-1, 0} end, - ok = osiris_log:set_committed_chunk_id(Log, CommittedOffset), - counters:put(CntRef, ?C_COMMITTED_OFFSET, CommittedOffset), + LastOffset = NextOffset - 1, + ok = osiris_log:set_committed_chunk_id(Log, LastChunkId), + ok = osiris_log:set_committed_offset(Log, LastOffset), + counters:put(CntRef, ?C_COMMITTED_CHUNK_ID, LastChunkId), + counters:put(CntRef, ?C_COMMITTED_OFFSET, LastOffset), counters:put(CntRef, ?C_EPOCH, Epoch), EvtFmt = maps:get(event_formatter, Config, undefined), - ?INFO("osiris_writer:init/1: name: ~ts last offset: ~b " + ?INFO("osiris_writer:init/1: name: ~ts committed offset: ~b " "committed chunk id: ~b epoch: ~b", - [Name, LastOffs, CommittedOffset, Epoch]), + [Name, LastOffset, LastChunkId, Epoch]), {ok, #?MODULE{cfg = #cfg{name = Name, @@ -252,15 +259,15 @@ handle_continue(#{name := Name, replicas = Replicas, directory = Dir, counter = CntRef}, - committed_offset = CommittedOffset, - replica_state = maps:from_list([{R, {-1, 0}} || R <- Replicas]), + committed_chunk_id = LastChunkId, + replica_state = maps:from_list([{R, {0, {0, -1, 0}}} || R <- Replicas]), log = Log, tracking = Trk}}. handle_batch(Commands, #?MODULE{cfg = #cfg{counter = Cnt} = Cfg, duplicates = Dupes0, - committed_offset = COffs0, + committed_chunk_id = ChkId0, tracking = Trk0} = State0) -> @@ -293,28 +300,29 @@ handle_batch(Commands, State1#?MODULE{tracking = Trk, log = Log}), - LastChId = + TailInfo = case osiris_log:tail_info(State2#?MODULE.log) of - {_, {_, TailChId, _TailTs}} -> - TailChId; + ?TAIL_INFO(_) = TI -> + TI; _ -> - -1 + {-1, {-1, -1, 0}} end, - AllChIds = maps:fold(fun (_, {O, _}, Acc) -> - [O | Acc] - end, [LastChId], State2#?MODULE.replica_state), - COffs = agreed_commit(AllChIds), + ReplicaTI = maps:values(State2#?MODULE.replica_state), + {NOffs, {_, ChkId, _}} = agreed_commit([TailInfo | ReplicaTI]), - RemDupes = handle_duplicates(COffs, Dupes ++ Dupes0, Cfg), + RemDupes = handle_duplicates(ChkId, Dupes ++ Dupes0, Cfg), %% if committed offset has increased - update - State = case COffs > COffs0 of + State = case ChkId > ChkId0 of true -> P = State2#?MODULE.pending_corrs, - ok = osiris_log:set_committed_chunk_id(Log, COffs), - counters:put(Cnt, ?C_COMMITTED_OFFSET, COffs), - Pending = notify_writers(P, COffs, Cfg), - State2#?MODULE{committed_offset = COffs, + LastOffset = NOffs - 1, + ok = osiris_log:set_committed_chunk_id(Log, ChkId), + ok = osiris_log:set_committed_offset(Log, LastOffset), + counters:put(Cnt, ?C_COMMITTED_CHUNK_ID, ChkId), + counters:put(Cnt, ?C_COMMITTED_OFFSET, LastOffset), + Pending = notify_writers(P, ChkId, Cfg), + State2#?MODULE{committed_chunk_id = ChkId, duplicates = RemDupes, pending_corrs = Pending}; false -> @@ -346,7 +354,7 @@ format_status(#?MODULE{cfg = #cfg{name = Name, replica_state = ReplicaState, data_listeners = DataListeners, offset_listeners = OffsetListeners, - committed_offset = CommittedOffset}) -> + committed_chunk_id = CommittedChunkId}) -> #{name => Name, external_reference => ExtRef, replica_nodes => Replicas, @@ -357,7 +365,7 @@ format_status(#?MODULE{cfg = #cfg{name = Name, num_pending_correlations => queue:len(PendingCorrs), num_data_listeners => length(DataListeners), num_offset_listeners => length(OffsetListeners), - committed_offset => CommittedOffset + committed_chunk_id => CommittedChunkId }. %% Internal @@ -469,7 +477,7 @@ handle_command({cast, State0#?MODULE{offset_listeners = [{Pid, Offset, EvtFormatter} | Listeners]}, {State, Records, Replies, Corrs, Trk, Dupes}; -handle_command({cast, {ack, ReplicaNode, {Offset, _} = OffsetTs}}, +handle_command({cast, {ack, ReplicaNode, TailInfoOrOffsTs}}, {#?MODULE{replica_state = ReplicaState0} = State0, Records, Replies, @@ -477,10 +485,17 @@ handle_command({cast, {ack, ReplicaNode, {Offset, _} = OffsetTs}}, Trk, Dupes}) -> % ?DEBUG("osiris_writer ack from ~w at ~b", [ReplicaNode, Offset]), + %% ack message can be a tail_info or a 2-term tuple + {Offset, TI} = case TailInfoOrOffsTs of + ?TAIL_INFO(ChkId) -> + {ChkId, TailInfoOrOffsTs}; + {Offs, Ts} -> + {Offs, {0, {0, Offs, Ts}}} + end, ReplicaState = case ReplicaState0 of - #{ReplicaNode := {O, _}} when Offset > O -> - ReplicaState0#{ReplicaNode => OffsetTs}; + #{ReplicaNode := ?TAIL_INFO(O)} when Offset > O -> + ReplicaState0#{ReplicaNode => TI}; _ -> %% ignore anything else including acks from unknown replicas ReplicaState0 @@ -493,8 +508,8 @@ handle_command({call, From, get_reader_context}, name = Name, directory = Dir, counter = CntRef}, - log = Log, - committed_offset = COffs} = + committed_chunk_id = CChkId, + log = Log} = State, Records, Replies, @@ -506,7 +521,8 @@ handle_command({call, From, get_reader_context}, {reply, From, #{dir => Dir, name => Name, - committed_offset => max(0, COffs), + committed_chunk_id => max(0, CChkId), + committed_offset => max(0, osiris_log:committed_offset(Log)), shared => Shared, reference => Ref, readers_counter_fun => fun(Inc) -> counters:add(CntRef, ?C_READERS, Inc) end @@ -530,14 +546,17 @@ handle_command({call, From, query_replication_state}, {#?MODULE{log = Log, replica_state = R} = State, Records, Replies0, Corrs, Trk, Dupes}) -> + Result0 = maps:map(fun(_, ?TAIL_INFO(O, T)) -> + {O, T} + end, R), %% need to merge pending tracking entries before read - Result = case osiris_log:tail_info(Log) of - {0, empty} -> - R#{node() => {-1, 0}}; - {_, {_E, O, T}} -> - R#{node() => {O, T}} - end, - Replies = [{reply, From, Result} | Replies0], + Result1 = case osiris_log:tail_info(Log) of + {0, empty} -> + Result0#{node() => {-1, 0}}; + ?TAIL_INFO(O, T) -> + Result0#{node() => {O, T}} + end, + Replies = [{reply, From, Result1} | Replies0], {State, Records, Replies, Corrs, Trk, Dupes}; handle_command({call, From, {update_retention, Retention}}, {#?MODULE{log = Log0} = State, @@ -566,7 +585,7 @@ notify_data_listeners(#?MODULE{log = Seg, data_listeners = L0} = notify_offset_listeners(#?MODULE{cfg = #cfg{reference = Ref, event_formatter = EvtFmt}, - committed_offset = COffs, + committed_chunk_id = COffs, offset_listeners = L0} = State) -> {Notify, L} = @@ -621,9 +640,11 @@ wrap_osiris_event(undefined, Evt) -> wrap_osiris_event({M, F, A}, Evt) -> apply(M, F, [Evt | A]). --spec agreed_commit([osiris:offset()]) -> osiris:offset(). +-spec agreed_commit([osiris:tail_info()]) -> osiris:tail_info(). agreed_commit(Indexes) -> - SortedIdxs = lists:sort(fun erlang:'>'/2, Indexes), + SortedIdxs = lists:sort(fun(?TAIL_INFO(ChkId0), ?TAIL_INFO(ChkId1)) -> + ChkId0 > ChkId1 + end, Indexes), Nth = length(SortedIdxs) div 2 + 1, lists:nth(Nth, SortedIdxs). diff --git a/test/osiris_SUITE.erl b/test/osiris_SUITE.erl index 587788f5..9471866e 100644 --- a/test/osiris_SUITE.erl +++ b/test/osiris_SUITE.erl @@ -87,7 +87,10 @@ all_tests() -> cluster_reader_counters, combine_ips_hosts_test, empty_last_segment, - replica_reader_nodedown_noproc]. + replica_reader_nodedown_noproc, + committed_offset_calculate_with_single_node, + committed_offset_calculate_with_cluster, + cluster_should_work_with_committed_offset_calculate_on_off]. %% Isolated to avoid test interference ipv6_tests() -> @@ -852,9 +855,9 @@ cluster_restart_large(Config) -> %% wait for replica key counters to match writer await_condition(fun () -> - RC = erpc:call(hd(Replicas), osiris_counters, overview, - [{osiris_replica, Name}]), - maps:with(Keys, CountersPre) == maps:with(Keys, RC) + RC = erpc:call(hd(Replicas), osiris_counters, counters, + [{osiris_replica, Name}, Keys]), + maps:with(Keys, CountersPre) == RC end, 1000, 20), Replica1CountersPre = erpc:call(hd(Replicas), osiris_counters, overview, [{osiris_replica, Name}]), @@ -1918,6 +1921,151 @@ replica_reader_nodedown_noproc(_Config) -> osiris_replica_reader:start(node(), #{}), ok. +committed_offset_calculate_with_single_node(Config) -> + Name = ?config(cluster_name, Config), + WriterConf = #{name => Name, + reference => Name, + epoch => 1, + leader_node => node(), + replica_nodes => []}, + + {ok, Pid} = osiris:start_writer(WriterConf), + + ?assertEqual(-1, committed_offset(Pid)), + + Num = 100, + write_n(Pid, Num, #{}), + + await_condition(fun() -> + committed_offset(Pid) =:= Num - 1 + end, 100, 20), + + write_n(Pid, Num, #{}), + + await_condition(fun() -> + committed_offset(Pid) =:= 2 * Num - 1 + end, 100, 20), + + ok = osiris:stop_cluster(WriterConf), + + {ok, Pid2} = osiris:start_writer(WriterConf), + ?assertEqual(2 * Num - 1, committed_offset(Pid2)), + + ok = osiris:stop_cluster(WriterConf), + + ok. + +committed_offset_calculate_with_cluster(Config) -> + PrivDir = ?config(data_dir, Config), + Name = ?config(cluster_name, Config), + PeerStates = [start_child_node(N, PrivDir, application:get_all_env(osiris)) + || N <- [s1, s2, s3]], + [LeaderNode, Replica1Node, Replica2Node] = [NodeName || {_Ref, NodeName} <- PeerStates], + Epoch = 1, + ReplicaNodes = [Replica1Node, Replica2Node], + + WriterConf = #{name => Name, + reference => Name, + epoch => Epoch, + leader_node => LeaderNode, + replica_nodes => ReplicaNodes}, + + {ok, LeaderPid} = osiris:start_writer(WriterConf), + + ReplicaConf = WriterConf#{leader_pid => LeaderPid, + features => #{committed_offset_calculate => true}}, + {ok, Replica1Pid} = osiris_replica:start(Replica1Node, ReplicaConf), + {ok, Replica2Pid} = osiris_replica:start(Replica2Node, ReplicaConf), + Pids = [LeaderPid, Replica1Pid, Replica2Pid], + + [?assertEqual(-1, committed_offset(Pid)) || Pid <- Pids], + + Num = 100, + write_n(LeaderPid, Num, #{}), + + [await_condition(fun() -> + committed_offset(Pid) =:= Num - 1 + end, 100, 20) || Pid <- Pids], + + write_n(LeaderPid, Num, #{}), + + [await_condition(fun() -> + committed_offset(Pid) =:= 2 * Num - 1 + end, 100, 20) || Pid <- Pids], + + ok = osiris:stop_cluster(WriterConf), + + {ok, LeaderPid2} = osiris:start_writer(WriterConf), + ReplicaConf2 = ReplicaConf#{leader_pid => LeaderPid2}, + {ok, Replica1Pid2} = osiris_replica:start(Replica1Node, ReplicaConf2), + {ok, Replica2Pid2} = osiris_replica:start(Replica2Node, ReplicaConf2), + Pids2 = [LeaderPid2, Replica1Pid2, Replica2Pid2], + + [?assertEqual(2 * Num - 1, committed_offset(Pid)) || Pid <- Pids2], + + ok = osiris:stop_cluster(WriterConf), + + [stop_peer(Ref) || {Ref, _} <- PeerStates], + ok. + +cluster_should_work_with_committed_offset_calculate_on_off(Config) -> + PrivDir = ?config(data_dir, Config), + Name = ?config(cluster_name, Config), + PeerStates = [start_child_node(N, PrivDir, application:get_all_env(osiris)) + || N <- [s1, s2, s3]], + [LeaderNode, Replica1Node, Replica2Node] = [NodeName || {_Ref, NodeName} <- PeerStates], + Epoch = 1, + ReplicaNodes = [Replica1Node, Replica2Node], + + WriterConf = #{name => Name, + reference => Name, + epoch => Epoch, + leader_node => LeaderNode, + replica_nodes => ReplicaNodes}, + + {ok, LeaderPid} = osiris:start_writer(WriterConf), + + ReplicaConf = WriterConf#{leader_pid => LeaderPid}, + Features = #{committed_offset_calculate => true}, + {ok, Replica1Pid} = osiris_replica:start(Replica1Node, + ReplicaConf#{features => Features}), + {ok, Replica2Pid} = osiris_replica:start(Replica2Node, ReplicaConf), + Pids = [LeaderPid, Replica1Pid, Replica2Pid], + + [?assertEqual(-1, committed_offset(Pid)) || Pid <- Pids], + + Num = 100, + write_n(LeaderPid, Num, #{}), + + %% there is no guarantee the last committed offset is up-to-date + %% when not all the members are configured to support it, + %% so we check the committed chunk ID to make sure the cluster works properly + + [await_condition(fun() -> + committed_chunk_id(Pid) > 0 + end, 100, 20) || Pid <- Pids], + + write_n(LeaderPid, Num, #{}), + + [await_condition(fun() -> + committed_chunk_id(Pid) > Num + end, 100, 20) || Pid <- Pids], + + ok = osiris:stop_cluster(WriterConf), + + {ok, LeaderPid2} = osiris:start_writer(WriterConf), + ReplicaConf2 = ReplicaConf#{leader_pid => LeaderPid2}, + {ok, Replica1Pid2} = osiris_replica:start(Replica1Node, ReplicaConf2), + {ok, Replica2Pid2} = osiris_replica:start(Replica2Node, ReplicaConf2), + Pids2 = [LeaderPid2, Replica1Pid2, Replica2Pid2], + + [?assert(committed_chunk_id(Pid) > Num) || Pid <- Pids2], + + ok = osiris:stop_cluster(WriterConf), + + [stop_peer(Ref) || {Ref, _} <- PeerStates], + ok. + %% Utility write_n(Pid, N, Written) -> @@ -2237,3 +2385,9 @@ truncate(File, Sz) -> ok = file:truncate(Fd), ok = file:close(Fd), ok. + +committed_offset(Pid) -> + maps:get(committed_offset, osiris:get_stats(Pid)). + +committed_chunk_id(Pid) -> + maps:get(committed_chunk_id, osiris:get_stats(Pid)).