Skip to content

Commit 7f2f891

Browse files
committed
Switched from a file stream based approach
to a prim_file and buffer based approach. Use erlang's prim_file to read in large buffers then do a read_line to reach the next new line after the buffer.
1 parent 05eb603 commit 7f2f891

File tree

1 file changed

+60
-48
lines changed

1 file changed

+60
-48
lines changed

lib/brc.ex

+60-48
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,84 @@
11
defmodule Brc do
22
use Agent
33

4-
@pool_size 8
5-
@job_size 10_000
4+
@pool_size :erlang.system_info(:logical_processors)
5+
@blob_size 100_000
66

77
# each process has its own partial map of the cities
88
# for each line, parse out the city name and temperature * 10, keeping the numbers as integers
99
# if a city is already in the map, to the min, max and counting on it
10-
def process_lines(state_map, lines) do
10+
def process_lines(state_map, buffer) do
11+
lines = :binary.split(buffer, <<"\n">>, [:global])
1112

1213
Enum.reduce(lines, state_map, fn line, acc_map ->
13-
[city, temperature_text] = :binary.split(line, ";")
14-
[t1, t2] = :binary.split(temperature_text, ".")
15-
temperature = :erlang.binary_to_integer(t1 <> t2)
16-
17-
Map.update(
18-
acc_map,
19-
city,
20-
{temperature, 1, temperature, temperature},
21-
fn {min_temp, count, sum, max_temp} ->
22-
{min(min_temp, temperature), count + 1, sum + temperature, max(max_temp, temperature)}
23-
end
24-
)
14+
case line do
15+
"" ->
16+
acc_map
17+
18+
_ ->
19+
[city, temperature_text] = :binary.split(line, ";")
20+
[t1, t2] = :binary.split(temperature_text, ".")
21+
temperature = :erlang.binary_to_integer(t1 <> t2)
22+
23+
Map.update(
24+
acc_map,
25+
city,
26+
{temperature, 1, temperature, temperature},
27+
fn {min_temp, count, sum, max_temp} ->
28+
{min(min_temp, temperature), count + 1, sum + temperature,
29+
max(max_temp, temperature)}
30+
end
31+
)
32+
end
2533
end)
2634
end
2735

28-
def test_file_buf(filename) do
29-
file_stream = File.stream!(filename, [:read_ahead], 65_536)
30-
# IO.inspect(file_stream)
36+
def process_file(file, worker_pool) do
37+
case :prim_file.read(file, @blob_size) do
38+
:eof ->
39+
:ok
3140

32-
Stream.transform(file_stream, <<>>, fn elem, acc ->
33-
[new_acc | output_enum] = :binary.split(acc <> elem, <<"\n">>, [:global]) |> Enum.reverse()
34-
{output_enum, new_acc}
35-
end)
36-
|> Stream.chunk_every(@job_size)
37-
|> Enum.map(fn _ -> :ok end)
38-
end
41+
{:ok, buffer} ->
42+
buffer =
43+
case :prim_file.read_line(file) do
44+
:eof ->
45+
buffer
3946

47+
{:ok, line} ->
48+
<<buffer::binary, line::binary>>
49+
end
4050

41-
def run_file_buf(filename) do
51+
index = :ets.update_counter(:brc, _key = :index, _increment_by = 1)
4252

53+
Agent.cast(:ets.lookup_element(:workers, rem(index, @pool_size), 2), Brc, :process_lines, [
54+
buffer
55+
])
56+
57+
process_file(file, worker_pool)
58+
end
59+
end
60+
61+
def run_file_buf(filename) do
4362
:ets.new(:brc, [:public, :named_table])
4463
:ets.insert(:brc, {:index, -1})
4564

46-
worker_pool =
47-
Enum.map(1..@pool_size, fn _ ->
48-
Agent.start_link(fn -> %{} end) |> elem(1)
49-
end)
65+
:ets.new(:workers, [:set, :public, :named_table])
5066

51-
file_stream = File.stream!(filename, 65_536, [:read_ahead])
52-
# IO.inspect(file_stream)
67+
worker_pool =
68+
Enum.map(1..@pool_size, fn _ ->
69+
Agent.start_link(fn -> %{} end) |> elem(1)
70+
end)
5371

54-
# transform the bundles of bytes into lines separated by \n
55-
# this is faster than letting File.stream do :lines by itself
56-
Stream.transform(file_stream, <<>>, fn elem, acc ->
57-
[new_acc | output_enum] = :binary.split(acc <> elem, <<"\n">>, [:global]) |> Enum.reverse()
58-
{output_enum, new_acc}
59-
end)
60-
|> Stream.chunk_every(@job_size)
61-
|> Stream.each(fn job ->
62-
# round robin through the pool of workers
63-
index = :ets.update_counter(:brc, _key = :index, _increment_by = 1)
64-
# this is basically using Agents as less-hassle GenServers
65-
Agent.cast(Enum.at(worker_pool, rem(index, @pool_size)), Brc, :process_lines, [job])
66-
end)
67-
|> Stream.run
72+
Enum.each(Enum.zip(0..@pool_size - 1, worker_pool), fn w ->
73+
:ets.insert(:workers, w)
74+
end)
75+
76+
{:ok, file} = :prim_file.open(filename, [:binary, :read])
77+
78+
process_file(file, worker_pool)
6879

6980
# synchronous here to make sure all of the workers are finished
70-
pool_maps = Enum.map(worker_pool, fn pid -> Agent.get(pid, fn state -> state end) end)
81+
pool_maps = Enum.map(worker_pool, fn pid -> Agent.get(pid, fn state -> state end, :infinity) end)
7182

7283
# feeding all other maps into one, first one is the chosen one
7384
[head | tail] = pool_maps
@@ -85,6 +96,7 @@ defmodule Brc do
8596
Map.keys(combined_map)
8697
|> Enum.map(fn key ->
8798
{min_temp, count, sum, max_temp} = Map.get(combined_map, key)
99+
88100
{key,
89101
"#{key}=#{min_temp / 10}/#{:erlang.float_to_binary(sum / (count * 10), decimals: 1)}/#{max_temp / 10}"}
90102
end)
@@ -94,10 +106,10 @@ defmodule Brc do
94106

95107
# output in brc format
96108
IO.puts("{#{Enum.join(sorted_strings, ", ")}}")
97-
98109
end
99110

100111
def main(args) do
112+
IO.puts("Using prim_file")
101113
{uSec, :ok} =
102114
:timer.tc(fn ->
103115
run_file_buf(Enum.at(args, 0))

0 commit comments

Comments
 (0)