-
Notifications
You must be signed in to change notification settings - Fork 0
/
wtiproj06_cassandra_client.py
121 lines (100 loc) · 3.95 KB
/
wtiproj06_cassandra_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import json
from cassandra.cluster import Cluster
from cassandra.query import dict_factory
from wtiproj03_ETL import dict_list_to_df
class CassandraClient:
def __init__(self, host, port, keyspace, table):
self.cluster = Cluster([host], port=port)
self.session = self.cluster.connect()
self.session.row_factory = dict_factory
self.keyspace = keyspace
self.table = table
self.create_keyspace()
self.__create_table()
self.lastindex = 0
def create_keyspace(self):
self.session.execute(
"""
CREATE KEYSPACE IF NOT EXISTS """ + self.keyspace + """
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
"""
)
def __create_table(self):
self.session.execute(
"""
CREATE TABLE IF NOT EXISTS """ + self.keyspace + """.""" + self.table + """ (
id int,
ratings text,
PRIMARY KEY(id)
)
"""
)
self.lastindex = 0
def push_data_table(self, id, ratings):
self.session.execute(
"""
INSERT INTO """ + self.keyspace + """.""" + self.table + """ (id, ratings)
VALUES (%(id)s, %(ratings)s)
""",
{
'id': id,
'ratings': ratings
}
)
self.lastindex = len(self.get_data_table())
def get_data_table(self):
rows = self.session.execute("SELECT * FROM " + self.keyspace + "." + self.table + ";")
table_as_list = list(row for row in rows)
return table_as_list
def pull_data_table(self, key):
rows = self.session.execute("SELECT * FROM " + self.keyspace + "." + self.table + ";")
table_as_list = list(json.loads(row[key]) for row in rows)
genre_names = list(dict_list_to_df(table_as_list))[:-3]
return table_as_list, genre_names
def pull_avg_data_table(self, key, user_id):
rows = self.session.execute("SELECT * FROM " + self.keyspace + "." + self.table + " WHERE id=" + user_id + ";")
table_as_list = list(json.loads(row[key]) for row in rows)
genre_names = list(dict_list_to_df(table_as_list))[:-3]
return table_as_list, genre_names
def print_data_table(self):
rows = self.session.execute("SELECT * FROM " + self.keyspace + "." + self.table + ";")
for row in rows:
print(json.loads(row['ratings']))
def clear_table(self):
self.session.execute("TRUNCATE " + self.keyspace + "." + self.table + ";")
self.lastindex = 0
def delete_table(self):
self.session.execute("DROP TABLE " + self.keyspace + "." + self.table + ";")
if __name__ == "__main__":
host = '192.168.137.96'
# cc = CassandraClient(host, 9042, 'ratings', 'test')
# print("cassandra client ready")
# dict_list = [{'a': 1, 'b': 2},
# {'c': 3, 'd': 4},
# {'e': 5, 'f': 6},
# {'g': 7, 'h': 8}]
#
# # for x, d in enumerate(dict_list):
# # s = json.dumps(d)
# # # print(s)
# # # print(json.loads(s))
# # cc.push_data_table('test', x + 1, s)
#
# print(cc.pull_data_table('ratings')[0])
# if not cc.pull_data_table('ratings')[0]:
# print('empty')
# else:
# a, b = cc.pull_data_table('ratings')
# print(a)
# print(b)
# cc.print_data_table()
#
# cc.delete_table()
cassandra_client = CassandraClient(host, 9043, 'ratings', 'ratings_all')
cassandra_client.delete_table()
cassandra_avg_all = CassandraClient(host, 9043, 'avg', 'avg_genre_ratings')
cassandra_avg_all.delete_table()
cassandra_avg_user = CassandraClient(host, 9043, 'avg', 'avg_genre_ratings_for_user')
cassandra_avg_user.delete_table()
cassandra_profiles_client = CassandraClient(host, 9043, 'avg', 'user_profile')
cassandra_profiles_client.delete_table()