Skip to content

Commit 01aaa1f

Browse files
committed
Initial commit
0 parents  commit 01aaa1f

File tree

8 files changed

+289
-0
lines changed

8 files changed

+289
-0
lines changed

.formatter.exs

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]

.gitignore

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# The directory Mix will write compiled artifacts to.
2+
/_build/
3+
4+
# If you run "mix test --cover", coverage assets end up here.
5+
/cover/
6+
7+
# The directory Mix downloads your dependencies sources to.
8+
/deps/
9+
10+
# Where third-party dependencies like ExDoc output generated docs.
11+
/doc/
12+
13+
# Ignore .fetch files in case you like to edit your project deps locally.
14+
/.fetch
15+
16+
# If the VM crashes, it generates a dump, let's ignore it too.
17+
erl_crash.dump
18+
19+
# Also ignore archive artifacts (built via "mix archive.build").
20+
*.ez
21+
22+
# Ignore package tarball (built via "mix hex.build").
23+
brc-*.tar
24+
25+
# Temporary files, for example, from tests.
26+
/tmp/

README.md

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Brc
2+
3+
My attempt at the billion row challenge in Elixir.
4+
Elixir 1.16.0 - important because the argument order for File.stream changed between 1.15 and 1.16.
5+
Vanilla elixir, no extra libraries to run. I am including eflambe to run & make flamegraphs to help tune performance.
6+
7+
brc_city was my first attempt. It used a process for every city. Don't do this, most of your app's time will be spent in process sleeping.
8+
9+
brc uses a pool of workers. Each worker receives a list of cities. Fewer workers with more work eliminates idle time.
10+
11+
If using eflambe, run something like
12+
13+
iex -S mix
14+
15+
:eflambe.apply({Brc, :run_file_buf, ["measurements.txt"]}, [output_format: :brendan_gregg, open: :speedscope])
16+
17+
18+
## Installation
19+
20+
After cloning, run 'mix deps.get' to get eflambe. Then 'mix escript.build', then './brc measurments.txt'
21+

lib/brc.ex

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
defmodule Brc do
2+
use Agent
3+
4+
@pool_size 8
5+
@job_size 10_000
6+
7+
# each process has its own partial map of the cities
8+
# for each line, parse out the city name and temperature * 10, keeping the numbers as integers
9+
# if a city is already in the map, to the min, max and counting on it
10+
def process_lines(state_map, lines) do
11+
12+
Enum.reduce(lines, state_map, fn line, acc_map ->
13+
[city, temperature_text] = :binary.split(line, ";")
14+
[t1, t2] = :binary.split(temperature_text, ".")
15+
temperature = :erlang.binary_to_integer(t1 <> t2)
16+
17+
Map.update(
18+
acc_map,
19+
city,
20+
{temperature, 1, temperature, temperature},
21+
fn {min_temp, count, sum, max_temp} ->
22+
{min(min_temp, temperature), count + 1, sum + temperature, max(max_temp, temperature)}
23+
end
24+
)
25+
end)
26+
end
27+
28+
def test_file_buf(filename) do
29+
file_stream = File.stream!(filename, [:read_ahead], 65_536)
30+
# IO.inspect(file_stream)
31+
32+
Stream.transform(file_stream, <<>>, fn elem, acc ->
33+
[new_acc | output_enum] = :binary.split(acc <> elem, <<"\n">>, [:global]) |> Enum.reverse()
34+
{output_enum, new_acc}
35+
end)
36+
|> Stream.chunk_every(@job_size)
37+
|> Enum.map(fn _ -> :ok end)
38+
end
39+
40+
41+
def run_file_buf(filename) do
42+
43+
:ets.new(:brc, [:public, :named_table])
44+
:ets.insert(:brc, {:index, -1})
45+
46+
worker_pool =
47+
Enum.map(1..@pool_size, fn _ ->
48+
Agent.start_link(fn -> %{} end) |> elem(1)
49+
end)
50+
51+
file_stream = File.stream!(filename, 65_536, [:read_ahead])
52+
# IO.inspect(file_stream)
53+
54+
# transform the bundles of bytes into lines separated by \n
55+
# this is faster than letting File.stream do :lines by itself
56+
Stream.transform(file_stream, <<>>, fn elem, acc ->
57+
[new_acc | output_enum] = :binary.split(acc <> elem, <<"\n">>, [:global]) |> Enum.reverse()
58+
{output_enum, new_acc}
59+
end)
60+
|> Stream.chunk_every(@job_size)
61+
|> Stream.each(fn job ->
62+
# round robin through the pool of workers
63+
index = :ets.update_counter(:brc, _key = :index, _increment_by = 1)
64+
# this is basically using Agents as less-hassle GenServers
65+
Agent.cast(Enum.at(worker_pool, rem(index, @pool_size)), Brc, :process_lines, [job])
66+
end)
67+
|> Stream.run
68+
69+
# synchronous here to make sure all of the workers are finished
70+
pool_maps = Enum.map(worker_pool, fn pid -> Agent.get(pid, fn state -> state end) end)
71+
72+
# feeding all other maps into one, first one is the chosen one
73+
[head | tail] = pool_maps
74+
75+
# if each map has a city, merge it into the chosen one
76+
combined_map =
77+
Enum.reduce(tail, head, fn elem, acc ->
78+
Map.merge(acc, elem, fn _key, {min1, count1, sum1, max1}, {min2, count2, sum2, max2} ->
79+
{min(min1, min2), count1 + count2, sum1 + sum2, max(max1, max2)}
80+
end)
81+
end)
82+
83+
# city for sorting, plus string we will output
84+
keys_strings =
85+
Map.keys(combined_map)
86+
|> Enum.map(fn key ->
87+
{min_temp, count, sum, max_temp} = Map.get(combined_map, key)
88+
{key,
89+
"#{key}=#{min_temp / 10}/#{:erlang.float_to_binary(sum / (count * 10), decimals: 1)}/#{max_temp / 10}"}
90+
end)
91+
92+
# sort the strings by city/key then discard the key, keep the output
93+
sorted_strings = Enum.sort_by(keys_strings, &elem(&1, 0)) |> Enum.map(&elem(&1, 1))
94+
95+
# output in brc format
96+
IO.puts("{#{Enum.join(sorted_strings, ", ")}}")
97+
98+
end
99+
100+
def main(args) do
101+
{uSec, :ok} =
102+
:timer.tc(fn ->
103+
run_file_buf(Enum.at(args, 0))
104+
:ok
105+
end)
106+
107+
IO.puts("It took #{uSec / 1000} milliseconds")
108+
end
109+
end

lib/brc_city.ex

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
defmodule BrcCity do
2+
use Agent
3+
4+
def dispatch_line(state, line) do
5+
[city, temperature_text] = :binary.split(line, ";")
6+
[t1, t2] = :binary.split(temperature_text, ".")
7+
temperature = :erlang.binary_to_integer(t1 <> t2)
8+
9+
{new_state, pid} = case Map.get(state, city) do
10+
nil ->
11+
{:ok, pid} = Agent.start_link(fn -> {1000, 0, 0, -1000} end)
12+
{Map.put(state, city, pid), pid}
13+
existing_pid ->
14+
{state, existing_pid}
15+
end
16+
17+
Agent.cast(pid, Brc, :update_worker, [temperature])
18+
new_state
19+
end
20+
21+
def dispatch_get(state) do
22+
keys_strings = Map.keys(state)
23+
|> Enum.map(fn key ->
24+
pid = Map.get(state, key)
25+
Agent.get(pid, fn {min_temp, count, sum, max_temp} ->
26+
{key, "#{key}=#{min_temp / 10}/#{:erlang.float_to_binary(sum / (count * 10), [decimals: 1])}/#{max_temp / 10}"}
27+
end)
28+
end)
29+
sorted_strings = Enum.sort_by(keys_strings, &(elem(&1, 0))) |> Enum.map(&(elem(&1, 1)))
30+
31+
"{#{Enum.join(sorted_strings, ", ")}}"
32+
end
33+
34+
@spec update_worker({integer(), integer(), integer(), integer()}, integer()) :: {integer(), integer(), integer(), integer()}
35+
def update_worker({min_temp, count, sum, max_temp}, temperature) do
36+
{
37+
min(min_temp, temperature),
38+
count + 1,
39+
sum + temperature,
40+
max(max_temp, temperature)
41+
}
42+
end
43+
44+
defp gather_lines(<<"\n", rest::binary>>, acc, dispatch_pid) do
45+
# do things with acc
46+
Agent.cast(dispatch_pid, Brc, :dispatch_line, [acc])
47+
gather_lines(rest, <<>>, dispatch_pid)
48+
end
49+
50+
defp gather_lines(<<c::binary-size(1), rest::binary>>, acc, dispatch_pid) do
51+
gather_lines(rest, acc <> c, dispatch_pid)
52+
end
53+
54+
defp gather_lines(<<>>, acc, _dispatch_pid) do
55+
acc
56+
end
57+
58+
def run_file_buf(filename) do
59+
60+
{:ok, dispatch_pid} = Agent.start_link(fn -> %{} end)
61+
62+
file_stream = File.stream!(filename, 16384, [:read_ahead])
63+
IO.inspect(file_stream)
64+
65+
worker_stream = Stream.transform(file_stream, <<>>,
66+
fn elem, acc ->
67+
# IO.puts(elem)
68+
new_acc = gather_lines(acc <> elem, <<>>, dispatch_pid)
69+
{[], new_acc}
70+
end
71+
)
72+
73+
Stream.run(worker_stream)
74+
75+
my_list = Agent.get(dispatch_pid, &dispatch_get/1)
76+
IO.puts(my_list)
77+
78+
end
79+
80+
def main(args) do
81+
82+
{uSec, :ok} = :timer.tc(
83+
fn ->
84+
run_file_buf(Enum.at(args, 0))
85+
:ok
86+
end
87+
)
88+
IO.puts("It took #{uSec / 1000} milliseconds")
89+
end
90+
end

mix.exs

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
defmodule Brc.MixProject do
2+
use Mix.Project
3+
4+
def project do
5+
[
6+
app: :brc,
7+
escript: [main_module: Brc],
8+
version: "0.1.0",
9+
elixir: "~> 1.15",
10+
start_permanent: Mix.env() == :prod,
11+
deps: deps()
12+
]
13+
end
14+
15+
# Run "mix help compile.app" to learn about applications.
16+
def application do
17+
[
18+
extra_applications: [:logger]
19+
]
20+
end
21+
22+
# Run "mix help deps" to learn about dependencies.
23+
defp deps do
24+
[
25+
{:eflambe, "~> 0.3.1"}
26+
# {:dep_from_hexpm, "~> 0.3.0"},
27+
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
28+
]
29+
end
30+
end

test/brc_test.exs

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
defmodule BrcTest do
2+
use ExUnit.Case
3+
doctest Brc
4+
5+
test "greets the world" do
6+
assert Brc.hello() == :world
7+
end
8+
end

test/test_helper.exs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ExUnit.start()

0 commit comments

Comments
 (0)