Skip to content

Commit 25d28e6

Browse files
Fix dialyzer warnings (#10)
* make the should_deflate flag part of the state (#2) To control if it should or not deflate, the library was accepting a flag on kpl_agg:finish/2. But, this wasn't applied to aggregated records created implicitly when adding new items. This PR makes the should_deflate part of the state, and allow it to be initialized using kpl_agg:new/1 (default is to not deflate). Note: It has a backward incompatible change, as kpl_agg:finish/2 is removed. * Avoid cuadratic complexity, keep track of current list length (#3) Do not calculate length(List) on each item addition, keep a track of the current buffer length. * Remove dialyzer warnings * Fix #stream_record.timestamp type
1 parent b9cd2f3 commit 25d28e6

File tree

5 files changed

+46
-28
lines changed

5 files changed

+46
-28
lines changed

include/erlmld.hrl

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525

2626
-record(stream_record, {
2727
partition_key :: binary(),
28-
timestamp :: non_neg_integer(), % approximate arrival time (ms)
28+
timestamp :: undefined | non_neg_integer(), % approximate arrival time (ms)
2929
delay :: non_neg_integer(), % approximate delay between this record and tip of stream (ms)
3030
sequence_number :: sequence_number(),
31-
data :: binary()
31+
data :: term()
3232
}).
3333

3434
-type worker_state() :: term().

rebar.config

+15
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,18 @@
4242
]}
4343
]}
4444
]}.
45+
46+
{dialyzer, [
47+
{warnings, [unknown, no_return, error_handling]},
48+
{plt_apps, top_level_deps},
49+
{plt_extra_apps, []},
50+
{plt_location, local},
51+
{base_plt_apps, [erts, stdlib, kernel, sasl, runtime_tools, tools]},
52+
{base_plt_location, global}
53+
]}.
54+
55+
{xref_checks,[
56+
undefined_function_calls,
57+
locals_not_used,
58+
deprecated_function_calls
59+
]}.

src/erlmld.app.src

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
{applications,
1818
[kernel,
1919
stdlib,
20+
crypto,
2021
erlexec,
2122
jiffy,
2223
b64fast]},

src/erlmld_wrk_statem.erl

+6-6
Original file line numberDiff line numberDiff line change
@@ -119,23 +119,23 @@
119119
handler_data :: term(),
120120

121121
%% connected socket owned by this process:
122-
socket :: gen_tcp:socket(),
122+
socket :: undefined | gen_tcp:socket(),
123123

124124
%% input buffer; responses are small and we need no output buffer:
125-
buf = [] :: list(binary()),
125+
buf = [] :: [binary()],
126126

127127
%% worker state returned from handler module init:
128-
worker_state :: term(),
128+
worker_state :: undefined | term(),
129129

130130
%% if true, the MLD made a processRecords call with the V2 format (supplied
131131
%% millisBehindLatest), so we will checkpoint using the V2 checkpoint format:
132132
is_v2 = false :: boolean(),
133133

134134
%% most recent action name from the peer:
135-
last_request :: binary(),
135+
last_request :: undefined | binary(),
136136

137137
%% last attempted checkpoint:
138-
last_checkpoint :: checkpoint()
138+
last_checkpoint :: undefined | checkpoint()
139139
}).
140140

141141
-define(INTERNAL, internal).
@@ -590,7 +590,7 @@ next_line(Bin, #data{buf = Buf} = Data) ->
590590
%% and remaining data. an "action" is a line which should have been a json-encoded map
591591
%% containing an "action" key. if decoding fails with a thrown error, that error is
592592
%% returned as the decoded value.
593-
-spec next_action(binary(), #data{}) -> {map() | undefined, #data{}, binary()}.
593+
-spec next_action(binary(), #data{}) -> {map() | undefined | {error, term()}, #data{}, binary()}.
594594
next_action(Bin, Data) ->
595595
case next_line(Bin, Data) of
596596
{undefined, NData, Rest} ->

src/kpl_agg.erl

+22-20
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
%%% end
3636
%%%
3737
%%% The result currently uses a non-standard magic prefix to prevent the KCL from
38-
%%% deaggregating the record automatically. To use compression, use
39-
%%% `kpl_agg:finish/2` with `true` as the second argument, which uses another
38+
%%% deaggregating the record automatically. To use compression, instantiate the
39+
%%% aggregator using kpl_agg:new(true), which uses another
4040
%%% non-standard magic prefix.
4141
%%%
4242
%%% @end
@@ -45,7 +45,7 @@
4545
-module(kpl_agg).
4646

4747
%% API
48-
-export([new/0, count/1, size_bytes/1, finish/1, finish/2, add/2, add_all/2]).
48+
-export([new/0, new/1, count/1, size_bytes/1, finish/1, add/2, add_all/2]).
4949

5050
-define(MD5_DIGEST_BYTES, 16).
5151
%% From http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html:
@@ -57,7 +57,8 @@
5757
%% A set of keys, mapping each key to a unique index.
5858
-record(keyset, {
5959
rev_keys = [] :: list(binary()), %% list of known keys in reverse order
60-
key_to_index = maps:new() :: map() %% maps each known key to a 0-based index
60+
rev_keys_length = 0 :: non_neg_integer(), %% length of the rev_keys list
61+
key_to_index = maps:new() :: map() %% maps each known key to a 0-based index
6162
}).
6263

6364
%% Internal state of a record aggregator. It stores an aggregated record that
@@ -76,7 +77,9 @@
7677
explicit_hash_keyset = #keyset{} :: #keyset{},
7778

7879
%% List if user records added so far, in reverse order.
79-
rev_records = [] :: [#'Record'{}]
80+
rev_records = [] :: [#'Record'{}],
81+
82+
should_deflate = false
8083
}).
8184

8285

@@ -85,7 +88,9 @@
8588
%%%===================================================================
8689

8790
new() ->
88-
#state{}.
91+
new(false).
92+
new(ShouldDeflate) ->
93+
#state{should_deflate = ShouldDeflate}.
8994

9095
count(#state{num_user_records = Num} = _State) ->
9196
Num.
@@ -101,15 +106,12 @@ size_bytes(#state{agg_size_bytes = Size,
101106
end)
102107
+ byte_size(kpl_agg_pb:encode_msg(#'AggregatedRecord'{})).
103108

104-
finish(#state{num_user_records = 0} = State, _) ->
109+
finish(#state{num_user_records = 0} = State) ->
105110
{undefined, State};
106111

107-
finish(#state{agg_partition_key = AggPK, agg_explicit_hash_key = AggEHK} = State, ShouldDeflate) ->
112+
finish(#state{agg_partition_key = AggPK, agg_explicit_hash_key = AggEHK, should_deflate = ShouldDeflate} = State) ->
108113
AggRecord = {AggPK, serialize_data(State, ShouldDeflate), AggEHK},
109-
{AggRecord, new()}.
110-
111-
finish(State) ->
112-
finish(State, false).
114+
{AggRecord, new(ShouldDeflate)}.
113115

114116

115117
add(State, {PartitionKey, Data} = _Record) ->
@@ -277,23 +279,23 @@ is_key(Key, #keyset{key_to_index = KeyToIndex} = _KeySet) ->
277279

278280
get_or_add_key(undefined, KeySet) ->
279281
{undefined, KeySet};
280-
get_or_add_key(Key, #keyset{rev_keys = RevKeys, key_to_index = KeyToIndex} = KeySet) ->
282+
get_or_add_key(Key, #keyset{rev_keys = RevKeys, rev_keys_length = Length, key_to_index = KeyToIndex} = KeySet) ->
281283
case maps:get(Key, KeyToIndex, not_found) of
282284
not_found ->
283-
Index = length(RevKeys),
284285
NewKeySet = KeySet#keyset{
285286
rev_keys = [Key | RevKeys],
286-
key_to_index = maps:put(Key, Index, KeyToIndex)
287+
rev_keys_length = Length + 1,
288+
key_to_index = maps:put(Key, Length, KeyToIndex)
287289
},
288-
{Index, NewKeySet};
290+
{Length, NewKeySet};
289291
Index ->
290292
{Index, KeySet}
291293
end.
292294

293295

294-
potential_index(Key, #keyset{rev_keys = RevKeys, key_to_index = KeyToIndex} = _KeySet) ->
296+
potential_index(Key, #keyset{rev_keys_length = Length, key_to_index = KeyToIndex} = _KeySet) ->
295297
case maps:get(Key, KeyToIndex, not_found) of
296-
not_found -> length(RevKeys);
298+
not_found -> Length;
297299
Index -> Index
298300
end.
299301

@@ -443,9 +445,9 @@ full_record_test() ->
443445

444446

445447
deflate_test() ->
446-
Agg0 = new(),
448+
Agg0 = new(true),
447449
{undefined, Agg1} = add(Agg0, {<<"pk1">>, <<"data1">>, <<"ehk1">>}),
448-
{{_, Data, _}, _} = finish(Agg1, true),
450+
{{_, Data, _}, _} = finish(Agg1),
449451
<<Magic:4/binary, Deflated/binary>> = Data,
450452
?assertEqual(?KPL_AGG_MAGIC_DEFLATED, Magic),
451453
Inflated = zlib:uncompress(Deflated),

0 commit comments

Comments
 (0)