Skip to content

Commit

Permalink
CQ: Write large messages into their own files
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoguin committed May 16, 2024
1 parent 6019e75 commit 0575002
Showing 1 changed file with 125 additions and 41 deletions.
166 changes: 125 additions & 41 deletions deps/rabbit/src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
%% or the non-current files. If the message *is* in the
%% current file then the cache entry will be removed by
%% the normal logic for that in write_message/4 and
%% maybe_roll_to_new_file/2.
%% flush_or_roll_to_new_file/2.
case index_lookup(MsgId, State) of
[#msg_location { file = File }]
when File == State #msstate.current_file ->
Expand Down Expand Up @@ -1208,26 +1208,123 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
State#msstate{ gc_candidates = Candidates#{ File => true }}.

write_message(MsgId, Msg,
State0 = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
State = case MaybeFlush of
flush -> internal_sync(State0);
ok -> State0
end,
%% This value must be smaller enough than ?SCAN_BLOCK_SIZE
%% to ensure we only ever need 2 reads when scanning files.
%% Hence the choice of 4MB here and 4MiB there, the difference
%% in size being more than enough to ensure that property.
-define(LARGE_MESSAGE_THRESHOLD, 4000000). %% 4MB.

write_message(MsgId, MsgBody, State) ->
MsgBodyBin = term_to_binary(MsgBody),
%% Large messages get written to their own files.
if
byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD ->
write_large_message(MsgId, MsgBodyBin, State);
true ->
write_small_message(MsgId, MsgBodyBin, State)
end.

write_small_message(MsgId, MsgBodyBin,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts }) ->
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
ok = index_insert(
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
maybe_roll_to_new_file(CurOffset + TotalSize,
flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush,
State #msstate {
current_file_offset = CurOffset + TotalSize }).

flush_or_roll_to_new_file(
Offset, _MaybeFlush,
State = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
ok = writer_close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 };
%% If we need to flush, do so here.
flush_or_roll_to_new_file(_, flush, State) ->
internal_sync(State);
flush_or_roll_to_new_file(_, _, State) ->
State.

write_large_message(MsgId, MsgBodyBin,
State0 = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
current_file_offset = CurOffset,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
%% We haven't written in the file yet. Use it.
0 ->
{CurFile, CurHdl};
%% Flush the current file and close it. Open a new file.
_ ->
ok = writer_flush(CurHdl),
ok = writer_close(CurHdl),
LargeMsgFile0 = CurFile + 1,
{ok, LargeMsgHdl0} = writer_open(Dir, LargeMsgFile0),
{LargeMsgFile0, LargeMsgHdl0}
end,
%% Write the message directly and close the file.
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
ok = writer_close(LargeMsgHdl),
%% Update ets with the new information.
ok = index_insert(
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
offset = 0, total_size = TotalSize }, State0),
_ = case CurFile of
%% We didn't open a new file. We must update the existing value.
LargeMsgFile ->
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]);
%% We opened a new file. We can insert it all at once.
_ ->
true = ets:insert_new(FileSummaryEts, #file_summary {
file = LargeMsgFile,
valid_total_size = TotalSize,
file_size = TotalSize,
locked = false })
end,
%% Roll over to the next file.
NextFile = LargeMsgFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% Delete messages from the cache that were written to disk.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
%% Process confirms (this won't flush; we already did) and continue.
State = internal_sync(State0),
State #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 }.

contains_message(MsgId, From, State) ->
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
gen_server2:reply(From, MsgLocation =/= not_found),
Expand Down Expand Up @@ -1325,8 +1422,7 @@ writer_recover(Dir, Num, Offset) ->
ok = file:truncate(Fd),
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.

writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
MsgBodyBin = term_to_binary(MsgBody),
writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) ->
MsgBodyBinSize = byte_size(MsgBodyBin),
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
%% We send an iovec to the buffer instead of building a binary.
Expand Down Expand Up @@ -1354,6 +1450,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
file:write(Fd, prim_buffer:read_iovec(Buffer, Size))
end.

%% For large messages we don't buffer anything. Large messages
%% are kept within their own files.
%%
%% This is basically the same as writer_append except no buffering.
writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) ->
MsgBodyBinSize = byte_size(MsgBodyBin),
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
file:write(Fd, [
<<EntrySize:64>>,
MsgId,
MsgBodyBin,
<<255>> %% OK marker.
]),
EntrySize + 9.

writer_close(#writer{fd = Fd}) ->
file:close(Fd).

Expand Down Expand Up @@ -1700,33 +1811,6 @@ rebuild_index(Gatherer, Files, State) ->
%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------

maybe_roll_to_new_file(
Offset,
State = #msstate { dir = Dir,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
file_size_limit = FileSizeLimit })
when Offset >= FileSizeLimit ->
State1 = internal_sync(State),
ok = writer_close(CurHdl),
NextFile = CurFile + 1,
{ok, NextHdl} = writer_open(Dir, NextFile),
true = ets:insert_new(FileSummaryEts, #file_summary {
file = NextFile,
valid_total_size = 0,
file_size = 0,
locked = false }),
%% We only delete messages from the cache that were written to disk
%% in the previous file.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile,
current_file_offset = 0 };
maybe_roll_to_new_file(_, State) ->
State.

%% We keep track of files that have seen removes and
%% check those periodically for compaction. We only
%% compact files that have less than half valid data.
Expand Down

0 comments on commit 0575002

Please sign in to comment.