Skip to content

Commit f6548df

Browse files
committed
add concurrent map and fix reserve time and init transport
1 parent ace8f91 commit f6548df

File tree

4 files changed

+174
-16
lines changed

4 files changed

+174
-16
lines changed

concurrent_map.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"sync"
6+
)
7+
8+
var SHARD_COUNT = 32
9+
10+
// A "thread" safe map of type string:Anything.
11+
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
12+
type ConcurrentMap []*ConcurrentMapShared
13+
14+
// A "thread" safe string to anything map.
15+
type ConcurrentMapShared struct {
16+
items map[string]interface{}
17+
sync.RWMutex // Read Write mutex, guards access to internal map.
18+
}
19+
20+
// Creates a new concurrent map.
21+
func NewConcurrentMap() ConcurrentMap {
22+
m := make(ConcurrentMap, SHARD_COUNT)
23+
for i := 0; i < SHARD_COUNT; i++ {
24+
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
25+
}
26+
return m
27+
}
28+
29+
// Returns shard under given key
30+
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
31+
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
32+
}
33+
34+
// Sets the given value under the specified key.
35+
func (m ConcurrentMap) Set(key string, value interface{}) {
36+
// Get map shard.
37+
shard := m.GetShard(key)
38+
shard.Lock()
39+
shard.items[key] = value
40+
shard.Unlock()
41+
}
42+
43+
// Retrieves an element from map under given key.
44+
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
45+
// Get shard
46+
shard := m.GetShard(key)
47+
shard.RLock()
48+
// Get item from shard.
49+
val, ok := shard.items[key]
50+
shard.RUnlock()
51+
return val, ok
52+
}
53+
54+
// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
55+
type Tuple struct {
56+
Key string
57+
Val interface{}
58+
}
59+
60+
// Returns a buffered iterator which could be used in a for range loop.
61+
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
62+
chans := snapshot(m)
63+
total := 0
64+
for _, c := range chans {
65+
total += cap(c)
66+
}
67+
ch := make(chan Tuple, total)
68+
go fanIn(chans, ch)
69+
return ch
70+
}
71+
72+
// Returns a array of channels that contains elements in each shard,
73+
// which likely takes a snapshot of `m`.
74+
// It returns once the size of each buffered channel is determined,
75+
// before all the channels are populated using goroutines.
76+
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
77+
chans = make([]chan Tuple, SHARD_COUNT)
78+
wg := sync.WaitGroup{}
79+
wg.Add(SHARD_COUNT)
80+
// Foreach shard.
81+
for index, shard := range m {
82+
go func(index int, shard *ConcurrentMapShared) {
83+
// Foreach key, value pair.
84+
shard.RLock()
85+
chans[index] = make(chan Tuple, len(shard.items))
86+
wg.Done()
87+
for key, val := range shard.items {
88+
chans[index] <- Tuple{key, val}
89+
}
90+
shard.RUnlock()
91+
close(chans[index])
92+
}(index, shard)
93+
}
94+
wg.Wait()
95+
return chans
96+
}
97+
98+
// fanIn reads elements from channels `chans` into channel `out`
99+
func fanIn(chans []chan Tuple, out chan Tuple) {
100+
wg := sync.WaitGroup{}
101+
wg.Add(len(chans))
102+
for _, ch := range chans {
103+
go func(ch chan Tuple) {
104+
for t := range ch {
105+
out <- t
106+
}
107+
wg.Done()
108+
}(ch)
109+
}
110+
wg.Wait()
111+
close(out)
112+
}
113+
114+
// Reviles ConcurrentMap "private" variables to json marshal.
115+
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
116+
// Create a temporary map, which will hold all item spread across shards.
117+
tmp := make(map[string]interface{})
118+
119+
// Insert items to temporary map.
120+
for item := range m.IterBuffered() {
121+
tmp[item.Key] = item.Val
122+
}
123+
return json.Marshal(tmp)
124+
}
125+
126+
func fnv32(key string) uint32 {
127+
hash := uint32(2166136261)
128+
const prime32 = uint32(16777619)
129+
for i := 0; i < len(key); i++ {
130+
hash *= prime32
131+
hash ^= uint32(key[i])
132+
}
133+
return hash
134+
}
135+
136+
// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
137+
// will probably won't know which to type to unmarshal into, in such case
138+
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
139+
// out value type, this is why we've decided to remove this functionality.
140+
141+
// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
142+
// // Reverse process of Marshal.
143+
144+
// tmp := make(map[string]interface{})
145+
146+
// // Unmarshal into a single map.
147+
// if err := json.Unmarshal(b, &tmp); err != nil {
148+
// return nil
149+
// }
150+
151+
// // foreach key,value pair in temporary map insert into our concurrent map.
152+
// for key, val := range tmp {
153+
// m.Set(key, val)
154+
// }
155+
// return nil
156+
// }

config.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ userinfo:
1313
openid: ""
1414
h_5_source: ""
1515
device_token: ""
16+
s_id: ""
1617

1718
headers:
1819
ddmc_city_number: ""

main.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"net/url"
1212
"strconv"
1313
"strings"
14-
"sync"
1514
"time"
1615
)
1716

@@ -32,19 +31,18 @@ func main() {
3231

3332
handle()
3433

35-
var (
36-
cart, order map[string]interface{}
37-
m sync.Map
38-
)
34+
var cart, order map[string]interface{}
35+
36+
m := NewConcurrentMap()
3937

4038
cart = carts()
4139

4240
if len(cart) != 0 {
43-
m.LoadOrStore("cart", cart)
41+
m.Set("cart", cart)
4442
}
4543

4644
if len(order) != 0 {
47-
m.LoadOrStore("order", order)
45+
m.Set("order", order)
4846
}
4947

5048
reserveMap := customizeReserve()
@@ -55,18 +53,18 @@ func main() {
5553
go execute(notify, func() {
5654
cart = carts()
5755
if len(cart) != 0 {
58-
m.LoadOrStore("cart", cart)
56+
m.Set("cart", cart)
5957
return
6058
}
6159
sleeps()
6260
})
6361

6462
go execute(notify, func() {
65-
storeCart, ok := m.Load("cart")
63+
storeCart, ok := m.Get("cart")
6664
if ok {
6765
order = checkOrder(aid, storeCart.(map[string]interface{}), reserveMap)
6866
if len(order) != 0 {
69-
m.LoadOrStore("order", order)
67+
m.Set("order", order)
7068
return
7169
}
7270
sleeps()
@@ -86,8 +84,8 @@ func main() {
8684
return
8785
default:
8886
go func() {
89-
storeCart, cok := m.Load("cart")
90-
storeOrder, ook := m.Load("order")
87+
storeCart, cok := m.Get("cart")
88+
storeOrder, ook := m.Get("order")
9189
if cok && ook {
9290
if submitOrder(aid, storeCart.(map[string]interface{}),
9391
storeOrder.(map[string]interface{}), reserveMap) {
@@ -176,6 +174,7 @@ func userInfo() url.Values {
176174
values["sharer_uid"] = []string{config.UserInfo.SharerUid}
177175
values["openid"] = []string{config.UserInfo.Openid}
178176
values["h5_source"] = []string{config.UserInfo.H5Source}
177+
values["s_id"] = []string{config.UserInfo.Sid}
179178
values["device_token"] = []string{config.UserInfo.DeviceToken}
180179
return values
181180
}
@@ -184,6 +183,7 @@ func userInfo() url.Values {
184183
func headers() http.Header {
185184
headerMap := map[string][]string{}
186185
headerMap["ddmc-city-number"] = []string{config.Headers.CityNumber}
186+
headerMap["ddmc-time"] = []string{fmt.Sprint(time.Now().Unix())}
187187
headerMap["ddmc-build-version"] = []string{config.Headers.BuildVersion}
188188
headerMap["ddmc-device-id"] = []string{config.Headers.DeviceId}
189189
headerMap["ddmc-station-id"] = []string{config.Headers.StationId}
@@ -245,9 +245,9 @@ func addressId() string {
245245
}
246246

247247
for _, m := range infos {
248-
flag, ok := m["is_default"]
248+
isDefault, ok := m["is_default"].(bool)
249249
if ok {
250-
if flag.(bool) {
250+
if isDefault {
251251
return m["id"].(string)
252252
}
253253
}
@@ -361,9 +361,9 @@ func carts() map[string]interface{} {
361361
func customizeReserve() map[string]int64 {
362362
switch reserve {
363363
case 2:
364-
return map[string]int64{"reserved_time_start": unix(6, 30), "reserved_time_end": unix(14, 30)}
365-
default:
366364
return map[string]int64{"reserved_time_start": unix(14, 30), "reserved_time_end": unix(22, 30)}
365+
default:
366+
return map[string]int64{"reserved_time_start": unix(6, 30), "reserved_time_end": unix(14, 30)}
367367
}
368368
}
369369

modules.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Info struct {
1515
SharerUid string `yaml:"sharer_uid"`
1616
Openid string `yaml:"openid"`
1717
H5Source string `yaml:"h_5_source"`
18+
Sid string `yaml:"s_id"`
1819
DeviceToken string `yaml:"device_token"`
1920
}
2021
Headers struct {

0 commit comments

Comments
 (0)