-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbytes_per_user.py
82 lines (71 loc) · 3.43 KB
/
bytes_per_user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
""" An example to show how to compute values over the whole dataset with dask
"""
import altair
import infra.dask
import dask.dataframe
import dask.distributed
import numpy as np
import pandas as pd
if __name__ == "__main__":
client = infra.dask.setup_dask_client()
# Import the flows dataset
#
# Importantly, dask is lazy and doesn't actually import the whole thing,
# but just keeps track of where the file shards live on disk.
flows = dask.dataframe.read_parquet(
"data/clean/flows/typical_DIV_none_INDEX_user", engine="fastparquet")
print("To see execution status, check out the dask status page at localhost:8787 while the computation is running.")
print("Processing {} flows".format(len(flows)))
# The user column will become an index of the returned grouped frame.
user_totals = flows.groupby("user").aggregate({"bytes_up": np.sum,
"bytes_down": np.sum})
# Calling compute will realize user_totals as a single local pandas
# dataframe. Be careful calling .compute() on big stuff! You may run out
# of memory...
user_totals = user_totals.compute()
# Once you have a total, you might want to dump it to disk so you don't
# have to re-run the dask computations again! At this point you can work
# in a jupyter notebook from the intermediate file if that's easier too.
# user_totals.to_parquet("scratch/temp",
# compression="snappy",
# engine="fastparquet")
# user_totals = dask.dataframe.read_parquet("scratch/temp",
# engine="fastparquet").compute()
# Once user_totals has been computed and you can do things with
# it as a normal dataframe : )
print(user_totals)
print("user_totals is a ", type(user_totals))
# Add the index back as a dataframe column
user_totals = user_totals.reset_index()
# Transform to long form for altair.
# https://altair-viz.github.io/user_guide/data.html#converting-between-long-form-and-wide-form-pandas
user_totals = user_totals.melt(id_vars=["user"],
value_vars=["bytes_up", "bytes_down"],
var_name="direction",
value_name="amount",
)
print(user_totals)
print("Check out a basic chart!")
chart = altair.Chart(user_totals).mark_bar().encode(
x=altair.X("direction", title="", axis=altair.Axis(labels=False)),
y=altair.Y("amount:Q", title="Amount (Bytes)"),
color="direction",
column=altair.Column('user',
title="",
header=altair.Header(labelOrient="bottom",
labelAngle=-60,
labelAnchor="middle",
labelAlign="right",
),
sort=altair.SortField(field="amount",
order="descending"
),
),
).properties(
title="Data used per user"
).configure_title(
fontSize=20,
font='Courier',
anchor='start',
color='gray'
).serve(port=8891, open_browser=False)