Skip to content

Commit

Permalink
Fix/backup restore (#7)
Browse files Browse the repository at this point in the history
* Fix - deps

* (fix) Handle backup and restore correctly
  • Loading branch information
leonardb authored Jun 11, 2020
1 parent c395048 commit eef87c1
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 97 deletions.
6 changes: 5 additions & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{"1.1.0",
[{<<"sext">>,{pkg,<<"sext">>,<<"1.6.0">>},0}]}.
[{<<"erlfdb">>,
{git,"https://github.com/leonardb/couchdb-erlfdb.git",
{ref,"6a84378bd8637b58150c95b1905fd7257618f8dd"}},
0},
{<<"sext">>,{pkg,<<"sext">>,<<"1.6.0">>},0}]}.
[
{pkg_hash,[
{<<"sext">>, <<"E5466AC5A9622AB1A6BCA4311913515082BB0F6A6799C73400906C0CD141AD5D">>}]}
Expand Down
95 changes: 56 additions & 39 deletions src/mnesia_fdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ register() ->
register(default_alias()).

register(Alias) ->
?dbg("~p : register(~p)", [self(), Alias]),
Module = ?MODULE,
case mnesia:add_backend_type(Alias, Module) of
{atomic, ok} ->
Expand All @@ -175,8 +176,8 @@ show_table(Tab) ->
show_table(Alias, Tab) ->
show_table(Alias, Tab, 100).

show_table(_Alias, Tab, Limit) ->
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
show_table(_Alias, Tab0, Limit) ->
#st{db = Db, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
DPfx = ?DATA_PREFIX(Type),
StartKey = sext:prefix({TableId, DPfx, ?FDB_WC}),
i_show_table(Db, TableId, StartKey, Limit).
Expand Down Expand Up @@ -295,6 +296,8 @@ check_definition_entry(_Tab, _Id, {type, T} = P) when T==set; T==ordered_set; T=
[self(), _Tab, _Id, {type, T}, pp_stack()]),
P;
check_definition_entry(Tab, Id, {type, T}) ->
?dbg("~p: check_definition_entry(~p, ~p, ~p);~n Trace: ~s~n",
[self(), Tab, Id, {type, T}, pp_stack()]),
mnesia:abort({combine_error, Tab, [Id, {type, T}]});
check_definition_entry(_Tab, _Id, {user_properties, UPs} = P) ->
FdbOpts = proplists:get_value(fdb_opts, UPs, []),
Expand All @@ -317,9 +320,13 @@ check_definition_entry(_Tab, _Id, P) ->
P.

create_table(_Alias, Tab, Props) ->
?dbg("~p :: create_table(~p, ~p, ~p)", [self(), _Alias, Tab, Props]),
mnesia_fdb_manager:create(Tab, Props).

load_table(_Alias, Tab, restore, Props) ->
mnesia_fdb_manager:create(Tab, Props);
load_table(_Alias, Tab, _LoadReason, _Opts) ->
?dbg("~p :: load_table(~p, ~p, ~p, ~p)", [self(), _Alias, Tab, _LoadReason, _Opts]),
case mnesia_fdb_manager:st(Tab) of
#st{} ->
%% Table is loaded
Expand All @@ -329,6 +336,7 @@ load_table(_Alias, Tab, _LoadReason, _Opts) ->
end.

close_table(_Alias, _Tab) ->
?dbg("~p: close_table(~p, ~p)~n", [self(), _Alias, _Tab]),
ok.

-ifndef(MNESIA_FDB_NO_DBG).
Expand Down Expand Up @@ -366,16 +374,25 @@ sync_close_table(Alias, Tab) ->
delete_table(Alias, Tab) ->
?dbg("~p: delete_table(~p, ~p);~n Trace: ~s~n",
[self(), Alias, Tab, pp_stack()]),
ok = mnesia_fdb_manager:delete(Tab).
case mnesia_fdb_manager:load_if_exists(Tab) of
ok ->
mnesia_fdb_manager:delete(Tab);
{error, not_found} ->
ok
end.

info(_Alias, _Tab, memory) ->
?dbg("~p: info(~p, ~p, ~p)~n", [self(), _Alias, _Tab, memory]),
0;
info(_Alias, _Tab, size) ->
?dbg("~p: info(~p, ~p, ~p)~n", [self(), _Alias, _Tab, size]),
0;
info(_Alias, _Tab, _Item) ->
?dbg("~p: info(~p, ~p, ~p)~n", [self(), _Alias, _Tab, _Item]),
undefined.

write_info(_Alias, _Tab, _Key, _Value) ->
?dbg("~p: write_info(~p, ~p, ~p, ~p)~n", [self(), _Alias, _Tab, _Key, _Value]),
ok.

%% table sync calls
Expand Down Expand Up @@ -416,6 +433,7 @@ write_info(_Alias, _Tab, _Key, _Value) ->
%%

sender_init(Alias, Tab, _RemoteStorage, _Pid) ->
?dbg("~p: sender_init(~p, ~p, ~p, ~p)~n", [self(), Alias, Tab, _RemoteStorage, _Pid]),
%% Need to send a message to the receiver. It will be handled in
%% receiver_first_message/4 below. There could be a volley of messages...
{standard,
Expand All @@ -439,6 +457,7 @@ receive_data(Data, Alias, Tab, _Sender, State) ->
{more, State}.

receive_done(_Alias, _Tab, _Sender, _State) ->
?dbg("~p : receiver_done(~p, ~p, ~p, ~p)", [self(), _Alias, _Tab, _Sender, _State]),
ok.

%% End of table synch protocol
Expand All @@ -460,13 +479,14 @@ delete(Alias, Tab, Key) ->

%% Not relevant for an ordered_set
fixtable(_Alias, _Tab, _Bool) ->
?dbg("~p : fixtable(~p, ~p, ~p)", [self(),_Alias, _Tab, _Bool]),
?dbg("~p : fixtable(~p, ~p, ~p)", [self(), _Alias, _Tab, _Bool]),
ok.

%% To save storage space, we avoid storing the key twice. The key
%% in the record is replaced with []. It has to be put back in lookup/3.
insert(_Alias, Tab, Obj) ->
#st{tab = Tab} = St = mnesia_fdb_manager:st(Tab),
insert(_Alias, Tab0, Obj) ->
?dbg("~p : insert(~p, ~p, ~p)", [self(),_Alias, Tab0, Obj]),
#st{tab = Tab} = St = mnesia_fdb_manager:st(Tab0),
%% St = call(Alias, Tab, get_st),
Pos = keypos(Tab),
Key = element(Pos, Obj),
Expand All @@ -475,12 +495,12 @@ insert(_Alias, Tab, Obj) ->

%% Since the key is replaced with [] in the record, we have to put it back
%% into the found record.
lookup(Alias, Tab, Key) ->
?dbg("~p : lookup(~p, ~p, ~p)", [self(), Alias, Tab, Key]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
lookup(Alias, Tab0, Key) ->
?dbg("~p : lookup(~p, ~p, ~p)", [self(), Alias, Tab0, Key]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
case Type of
bag ->
lookup_bag(Db, TableId, Key, keypos(Tab));
lookup_bag(Db, TableId, Key, keypos(Tab0));
_ ->
EncKey = sext:encode({TableId, ?DATA_PREFIX(Type), Key}),
case erlfdb:get(Db, EncKey) of
Expand All @@ -506,12 +526,12 @@ lookup_bag(Db, TableId, Key, KeyPos) ->
end || {EKey, EVal} <- Rows]
end.

match_delete(Alias, Tab, Pat) when is_atom(Pat) ->
?dbg("~p : match_delete(~p, ~p, ~p)", [self(), Alias, Tab, Pat]),
match_delete(Alias, Tab0, Pat) when is_atom(Pat) ->
?dbg("~p : match_delete(~p, ~p, ~p)", [self(), Alias, Tab0, Pat]),
%%do_match_delete(Alias, Tab, ?FDB_START),
case is_wild(Pat) of
true ->
#st{db = Db, table_id = TableId} = mnesia_fdb_manager:st(Tab),
#st{db = Db, table_id = TableId} = mnesia_fdb_manager:st(Tab0),
%% @TODO need to verify if the 'parts' of large values are also removed
Prefixes = [{TableId},
{TableId, ?FDB_WC},
Expand All @@ -524,10 +544,10 @@ match_delete(Alias, Tab, Pat) when is_atom(Pat) ->
?dbg("is_wild failed on Pat ~p", [Pat]),
error(badarg)
end;
match_delete(_Alias, Tab, Pat) when is_tuple(Pat) ->
KP = keypos(Tab),
match_delete(_Alias, Tab0, Pat) when is_tuple(Pat) ->
KP = keypos(Tab0),
Key = element(KP, Pat),
#st{db = Db, table_id = TableId} = St = mnesia_fdb_manager:st(Tab),
#st{db = Db, table_id = TableId} = St = mnesia_fdb_manager:st(Tab0),
case is_wild(Key) of
true ->
%% @TODO need to verify if the 'parts' of large values are also removed
Expand All @@ -542,9 +562,9 @@ match_delete(_Alias, Tab, Pat) when is_tuple(Pat) ->
end,
ok.

first(Alias, Tab) ->
?dbg("~p : first(~p, ~p)", [self(), Alias, Tab]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
first(Alias, Tab0) ->
?dbg("~p : first(~p, ~p)", [self(), Alias, Tab0]),
#st{db = Db, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
StartKey = sext:prefix({TableId, ?DATA_PREFIX(Type), ?FDB_WC}),
EndKey = erlfdb_key:strinc(StartKey),
case erlfdb:get_range(Db, StartKey, EndKey, [{limit,1}]) of
Expand All @@ -554,9 +574,9 @@ first(Alias, Tab) ->
mnesia_fdb_lib:decode_key(EncKey, TableId)
end.

next(Alias, Tab, Key) ->
?dbg("~p : next(~p, ~p, ~p)", [self(), Alias, Tab, Key]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
next(Alias, Tab0, Key) ->
?dbg("~p : next(~p, ~p, ~p)", [self(), Alias, Tab0, Key]),
#st{db = Db, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
SKey = sext:encode({TableId, ?DATA_PREFIX(Type), Key}),
EKey = erlfdb_key:strinc(sext:prefix({TableId, ?DATA_PREFIX(Type), ?FDB_WC})),
case erlfdb:get_range(Db, SKey, EKey, [{limit, 2}]) of
Expand All @@ -575,9 +595,9 @@ next(Alias, Tab, Key) ->
hd(lists:delete(Key, [Key0, Key1]))
end.

prev(Alias, Tab, Key) ->
?dbg("~p : prev(~p, ~p, ~p)", [self(), Alias, Tab, Key]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
prev(Alias, Tab0, Key) ->
?dbg("~p : prev(~p, ~p, ~p)", [self(), Alias, Tab0, Key]),
#st{db = Db, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
SKey = sext:prefix({TableId, ?DATA_PREFIX(Type), ?FDB_WC}),
EKey = erlfdb_key:strinc(sext:prefix({TableId, ?DATA_PREFIX(Type), Key})),
case erlfdb:get_range(Db, SKey, EKey, [{reverse, true}, {limit, 2}]) of
Expand All @@ -596,9 +616,9 @@ prev(Alias, Tab, Key) ->
hd(lists:delete(Key, [Key0, Key1]))
end.

last(Alias, Tab) ->
?dbg("~p : last(~p, ~p)", [self(), Alias, Tab]),
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
last(Alias, Tab0) ->
?dbg("~p : last(~p, ~p)", [self(), Alias, Tab0]),
#st{db = Db, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
SKey = sext:prefix({TableId, ?DATA_PREFIX(Type), ?FDB_WC}),
EKey = erlfdb_key:strinc(SKey),
case erlfdb:get_range(Db, SKey, EKey, [{reverse, true}, {limit, 1}]) of
Expand Down Expand Up @@ -632,9 +652,9 @@ select(Alias, Tab, Ms) ->
'$end_of_table'
end.

select(Alias, Tab, Ms, Limit) when is_integer(Limit) ->
#st{db = Db, table_id = TableId, tab = Tab, type = Type} = mnesia_fdb_manager:st(Tab),
?dbg("~p : select(~p, ~p, ~p, ~p) mnesia_activity: ~p", [self(),Alias, Tab, Ms, Limit, get(mnesia_activity_state)]),
select(Alias, Tab0, Ms, Limit) when is_integer(Limit) ->
?dbg("~p : select(~p, ~p, ~p, ~p)", [self(),Alias, Tab0, Ms, Limit]),
#st{db = Db, table_id = TableId, tab = Tab, type = Type} = mnesia_fdb_manager:st(Tab0),
do_select(Db, TableId, Tab, Type, Ms, Limit).

slot(_A, _B, _C) ->
Expand All @@ -648,7 +668,7 @@ update_counter(Alias, Tab, Key, Incr) when is_integer(Incr) ->

%% server-side part
do_update_counter(_Key, _Incr, #st{type = bag}) ->
mnesia:abort(counter_not_supported_for_bag);
mnesia:abort(badarg);
do_update_counter(Key, Incr, #st{db = Db, table_id = TableId}) when is_integer(Incr) ->
%% Atomic counter increment
%% Mnesia counters are dirty only, and cannot go below zero
Expand Down Expand Up @@ -716,11 +736,13 @@ validate_record(_Alias, _Tab, RecName, Arity, Type, _Obj) ->
%% Extensions for files that are permanent. Needs to be cleaned up
%% e.g. at deleting the schema.
real_suffixes() ->
?dbg("~p : real_suffixes()", [self()]),
[].

%% Extensions for temporary files. Can be cleaned up when mnesia
%% cleans up other temporary files.
tmp_suffixes() ->
?dbg("~p : tmp_suffixes()", [self()]),
[].


Expand Down Expand Up @@ -893,11 +915,6 @@ extract_vars(T) when is_atom(T) ->
extract_vars(_) ->
[].

any_in_body(Vars, BodyVars) ->
lists:any(fun({_,Vs}) ->
intersection(Vs, BodyVars) =/= []
end, Vars).

intersection(A,B) when is_list(A), is_list(B) ->
A -- (A -- B).

Expand Down Expand Up @@ -1059,8 +1076,8 @@ decode_val(Db, <<"mfdb_ref", Key/binary>>) ->
decode_val(_Db, CodedVal) ->
binary_to_term(CodedVal).

fold(_Alias, Tab, Fun, Acc, MS, N) ->
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab),
fold(_Alias, Tab0, Fun, Acc, MS, N) ->
#st{db = Db, tab = Tab, table_id = TableId, type = Type} = mnesia_fdb_manager:st(Tab0),
do_fold(Db, TableId, Tab, Type, Fun, Acc, MS, N).

%% can be run on the server side.
Expand Down
Loading

0 comments on commit eef87c1

Please sign in to comment.