-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun
executable file
·95 lines (77 loc) · 3.39 KB
/
run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/env escript
%% -*- erlang -*-
%%! -smp enable -sname cougar
-define(DS, data_store).
-define(USAGE,
"This is an Erlang script that invokes a Service, specified in the 1st argument.\n"
"Any subsequent arguments are passed onto the invoked Service.\n"
"Regardless of how generic I would like to sound,\n"
"some behaviour of this `run` script is still specific to the `cougar` service.\n"
"\n"
"The `cougar` service is a word occurrence counter,\n"
"where words are separated by line-feed on standard input.\n"
"The words and they occurrences are printed on standard out in no particular order.\n"
"\n"
"To run the `cougar` service you need an input file, f.x.: a log file.\n"
"> cat 'some-log-file-name' | ./run cougar 10 0\n"
"\n"
"10, the first numeric argument specifies the maximum size the buffer is allowed to grow\n"
"before flushing to main storage (disk).\n"
"Using 0 instead of 10 would enable the buffer to grow infinitely large.\n"
"\n"
"0, the last argument disables the functionality of flushing the buffer after a specific timeout.\n"
"Using 1000 here would flush the buffer every second.\n"
"\n"
"The implementation uses a `buffer` to store intermediate results in memory.\n"
"The type of this `buffer` can either be `ets` or `dict`, prior being the default.\n"
"To use `dict`s, try:\n"
"> cat 'some-log-file-name' | BUFFER_TYPE=dict ./run cougar 10 0\n"
"\n"
"Recommended command:\n"
"> time sh -c 'tail -n100000 <large_file>.tsv | cut -f3 | ./run cougar 0 1000'\n"
).
read_stdin(Callback) ->
stdin_handler(io:get_line(""), Callback).
stdin_handler(eof, _Callback) -> done;
stdin_handler({error, Error}, _Callback) -> throw(Error);
stdin_handler(Data, Callback) ->
Callback(string:strip(Data, right, $\n)),
read_stdin(Callback).
dets_handler(DetsTable, WrappedFun) ->
% Open Database (data store file)
dets:open_file(DetsTable, [{access, read_write},
{auto_save, infinity},
{type, set}]),
% Reset the data store between runs
dets:delete_all_objects(DetsTable),
WrappedFun(DetsTable),
% Print out counts
dets:traverse(DetsTable, fun({K, V}) -> io:format("~s\t~p~n", [K, V]), continue end),
% Close data store
dets:close(DetsTable).
service_handler(Service, ServiceArgs, WrappedFun) ->
% Start the buffered data store writer
Service:start(ServiceArgs),
% Read data from standard in and write it to storage.
WrappedFun(fun Service:write/1),
% Stop the writer
Service:stop().
compile(Service) ->
% Compile source file.
BufferType = case os:getenv("BUFFER_TYPE") of
false -> "ets"; Value -> Value
end,
[ c:c(F, {d, list_to_atom(BufferType)})
|| F <- filelib:wildcard(Service++"*.erl") ].
main([Service, _MaxBufferSize, _FlushTimerTimeout] = Args) ->
compile(Service),
ServiceArgs = [ list_to_integer(A)|| A <- tl(Args) ],
ServiceInvoker = fun(Callback) -> read_stdin(Callback) end,
ServiceHandler = fun(DetsTable) -> service_handler(list_to_atom(Service),
[DetsTable|ServiceArgs],
ServiceInvoker)
end,
dets_handler(?DS, ServiceHandler);
main(_) ->
io:format(?USAGE),
halt(1).