-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathincomplete_py_implementation.py
45 lines (40 loc) · 1.57 KB
/
incomplete_py_implementation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import xxhash
import jumphash
import socket
import struct
class RedisClient:
def __init__(self, replicas):
self.conns = [[] for _ in replicas]
self.writers = [[] for _ in replicas]
self.readers = [[] for _ in replicas]
for i, servers in enumerate(replicas):
for server in servers:
ip, port = server.split(":")
conn = socket.socket()
conn.connect((ip, int(port)))
self.conns[i].append(conn)
self.writers[i].append(conn.makefile("w"))
self.readers[i].append(conn.makefile("r"))
@staticmethod
def find_shard(key, num_servers):
h = xxhash.xxh64()
h.update(key.encode("utf-8"))
return jumphash.jump_consistent(h.intdigest(), num_servers)
def set(self, key, value):
shard = self.find_shard(key, len(self.conns[0]))
result, err = None, None
for i, replica_conns in enumerate(self.conns):
conn = replica_conns[shard]
writer = self.writers[i][shard]
self.send_command(writer, "SET", key, value)
result, err = self.read_simple_string(conn)
return result, err
def delete(self, key):
shard = self.find_shard(key, len(self.conns[0]))
result, err = None, None
for i, replica_conns in enumerate(self.conns):
conn = replica_conns[shard]
writer = self.writers[i][shard]
self.send_command(writer, "DELETE", key)
result, err = self.read_simple_string(conn)
return result, err