Skip to content

Commit

Permalink
add concurrent map and fix reserve time and init transport
Browse files Browse the repository at this point in the history
  • Loading branch information
baerwang committed Apr 21, 2022
1 parent ace8f91 commit f6548df
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 16 deletions.
156 changes: 156 additions & 0 deletions concurrent_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package main

import (
"encoding/json"
"sync"
)

var SHARD_COUNT = 32

// A "thread" safe map of type string:Anything.
// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
type ConcurrentMap []*ConcurrentMapShared

// A "thread" safe string to anything map.
type ConcurrentMapShared struct {
items map[string]interface{}
sync.RWMutex // Read Write mutex, guards access to internal map.
}

// Creates a new concurrent map.
func NewConcurrentMap() ConcurrentMap {
m := make(ConcurrentMap, SHARD_COUNT)
for i := 0; i < SHARD_COUNT; i++ {
m[i] = &ConcurrentMapShared{items: make(map[string]interface{})}
}
return m
}

// Returns shard under given key
func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared {
return m[uint(fnv32(key))%uint(SHARD_COUNT)]
}

// Sets the given value under the specified key.
func (m ConcurrentMap) Set(key string, value interface{}) {
// Get map shard.
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}

// Retrieves an element from map under given key.
func (m ConcurrentMap) Get(key string) (interface{}, bool) {
// Get shard
shard := m.GetShard(key)
shard.RLock()
// Get item from shard.
val, ok := shard.items[key]
shard.RUnlock()
return val, ok
}

// Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
type Tuple struct {
Key string
Val interface{}
}

// Returns a buffered iterator which could be used in a for range loop.
func (m ConcurrentMap) IterBuffered() <-chan Tuple {
chans := snapshot(m)
total := 0
for _, c := range chans {
total += cap(c)
}
ch := make(chan Tuple, total)
go fanIn(chans, ch)
return ch
}

// Returns a array of channels that contains elements in each shard,
// which likely takes a snapshot of `m`.
// It returns once the size of each buffered channel is determined,
// before all the channels are populated using goroutines.
func snapshot(m ConcurrentMap) (chans []chan Tuple) {
chans = make([]chan Tuple, SHARD_COUNT)
wg := sync.WaitGroup{}
wg.Add(SHARD_COUNT)
// Foreach shard.
for index, shard := range m {
go func(index int, shard *ConcurrentMapShared) {
// Foreach key, value pair.
shard.RLock()
chans[index] = make(chan Tuple, len(shard.items))
wg.Done()
for key, val := range shard.items {
chans[index] <- Tuple{key, val}
}
shard.RUnlock()
close(chans[index])
}(index, shard)
}
wg.Wait()
return chans
}

// fanIn reads elements from channels `chans` into channel `out`
func fanIn(chans []chan Tuple, out chan Tuple) {
wg := sync.WaitGroup{}
wg.Add(len(chans))
for _, ch := range chans {
go func(ch chan Tuple) {
for t := range ch {
out <- t
}
wg.Done()
}(ch)
}
wg.Wait()
close(out)
}

// Reviles ConcurrentMap "private" variables to json marshal.
func (m ConcurrentMap) MarshalJSON() ([]byte, error) {
// Create a temporary map, which will hold all item spread across shards.
tmp := make(map[string]interface{})

// Insert items to temporary map.
for item := range m.IterBuffered() {
tmp[item.Key] = item.Val
}
return json.Marshal(tmp)
}

func fnv32(key string) uint32 {
hash := uint32(2166136261)
const prime32 = uint32(16777619)
for i := 0; i < len(key); i++ {
hash *= prime32
hash ^= uint32(key[i])
}
return hash
}

// Concurrent map uses Interface{} as its value, therefor JSON Unmarshal
// will probably won't know which to type to unmarshal into, in such case
// we'll end up with a value of type map[string]interface{}, In most cases this isn't
// out value type, this is why we've decided to remove this functionality.

// func (m *ConcurrentMap) UnmarshalJSON(b []byte) (err error) {
// // Reverse process of Marshal.

// tmp := make(map[string]interface{})

// // Unmarshal into a single map.
// if err := json.Unmarshal(b, &tmp); err != nil {
// return nil
// }

// // foreach key,value pair in temporary map insert into our concurrent map.
// for key, val := range tmp {
// m.Set(key, val)
// }
// return nil
// }
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ userinfo:
openid: ""
h_5_source: ""
device_token: ""
s_id: ""

headers:
ddmc_city_number: ""
Expand Down
32 changes: 16 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"
)

Expand All @@ -32,19 +31,18 @@ func main() {

handle()

var (
cart, order map[string]interface{}
m sync.Map
)
var cart, order map[string]interface{}

m := NewConcurrentMap()

cart = carts()

if len(cart) != 0 {
m.LoadOrStore("cart", cart)
m.Set("cart", cart)
}

if len(order) != 0 {
m.LoadOrStore("order", order)
m.Set("order", order)
}

reserveMap := customizeReserve()
Expand All @@ -55,18 +53,18 @@ func main() {
go execute(notify, func() {
cart = carts()
if len(cart) != 0 {
m.LoadOrStore("cart", cart)
m.Set("cart", cart)
return
}
sleeps()
})

go execute(notify, func() {
storeCart, ok := m.Load("cart")
storeCart, ok := m.Get("cart")
if ok {
order = checkOrder(aid, storeCart.(map[string]interface{}), reserveMap)
if len(order) != 0 {
m.LoadOrStore("order", order)
m.Set("order", order)
return
}
sleeps()
Expand All @@ -86,8 +84,8 @@ func main() {
return
default:
go func() {
storeCart, cok := m.Load("cart")
storeOrder, ook := m.Load("order")
storeCart, cok := m.Get("cart")
storeOrder, ook := m.Get("order")
if cok && ook {
if submitOrder(aid, storeCart.(map[string]interface{}),
storeOrder.(map[string]interface{}), reserveMap) {
Expand Down Expand Up @@ -176,6 +174,7 @@ func userInfo() url.Values {
values["sharer_uid"] = []string{config.UserInfo.SharerUid}
values["openid"] = []string{config.UserInfo.Openid}
values["h5_source"] = []string{config.UserInfo.H5Source}
values["s_id"] = []string{config.UserInfo.Sid}
values["device_token"] = []string{config.UserInfo.DeviceToken}
return values
}
Expand All @@ -184,6 +183,7 @@ func userInfo() url.Values {
func headers() http.Header {
headerMap := map[string][]string{}
headerMap["ddmc-city-number"] = []string{config.Headers.CityNumber}
headerMap["ddmc-time"] = []string{fmt.Sprint(time.Now().Unix())}
headerMap["ddmc-build-version"] = []string{config.Headers.BuildVersion}
headerMap["ddmc-device-id"] = []string{config.Headers.DeviceId}
headerMap["ddmc-station-id"] = []string{config.Headers.StationId}
Expand Down Expand Up @@ -245,9 +245,9 @@ func addressId() string {
}

for _, m := range infos {
flag, ok := m["is_default"]
isDefault, ok := m["is_default"].(bool)
if ok {
if flag.(bool) {
if isDefault {
return m["id"].(string)
}
}
Expand Down Expand Up @@ -361,9 +361,9 @@ func carts() map[string]interface{} {
func customizeReserve() map[string]int64 {
switch reserve {
case 2:
return map[string]int64{"reserved_time_start": unix(6, 30), "reserved_time_end": unix(14, 30)}
default:
return map[string]int64{"reserved_time_start": unix(14, 30), "reserved_time_end": unix(22, 30)}
default:
return map[string]int64{"reserved_time_start": unix(6, 30), "reserved_time_end": unix(14, 30)}
}
}

Expand Down
1 change: 1 addition & 0 deletions modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Info struct {
SharerUid string `yaml:"sharer_uid"`
Openid string `yaml:"openid"`
H5Source string `yaml:"h_5_source"`
Sid string `yaml:"s_id"`
DeviceToken string `yaml:"device_token"`
}
Headers struct {
Expand Down

0 comments on commit f6548df

Please sign in to comment.