Skip to content

Commit

Permalink
refactor: redis hget replace scan (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
710leo authored Apr 18, 2024
1 parent d752cb9 commit 01c1568
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 22 deletions.
18 changes: 5 additions & 13 deletions src/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,14 @@ func TableRecordCount(table, where string, args ...interface{}) (int64, error) {
})
}

func CacheKeyGets(ctx context.Context, prefix string) ([]string, error) {
iter := storage.Cache.Scan(ctx, 0, prefix, 0).Iterator()
keys := make([]string, 0)
for iter.Next(ctx) {
keys = append(keys, iter.Val())
}

return keys, iter.Err()
}
var IBEX_HOST_DOING = "ibex-host-doing"

func CacheRecordGets[T any](ctx context.Context, keys []string) ([]T, error) {
lst := make([]T, 0, len(keys))
values := storage.CacheMGet(ctx, keys)
func CacheRecordGets[T any](ctx context.Context) ([]T, error) {
lst := make([]T, 0)
values, _ := storage.Cache.HVals(ctx, IBEX_HOST_DOING).Result()
for _, val := range values {
t := new(T)
if err := json.Unmarshal(val, t); err != nil {
if err := json.Unmarshal([]byte(val), t); err != nil {
return nil, err
}
lst = append(lst, *t)
Expand Down
4 changes: 2 additions & 2 deletions src/models/task_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func MarkDoneStatus(id, clock int64, host, status, stdout, stderr string, edgeAl
}

func CacheMarkDone(ctx context.Context, taskHost TaskHost) error {
if err := storage.Cache.Del(ctx, hostDoingCacheKey(taskHost.Id, taskHost.Host)).Err(); err != nil {
if err := storage.Cache.HDel(ctx, IBEX_HOST_DOING, hostDoingCacheKey(taskHost.Id, taskHost.Host)).Err(); err != nil {
return err
}
TaskHostCachePush(taskHost)
Expand Down Expand Up @@ -227,7 +227,7 @@ func ReportCacheResult() error {
// id大于redis初始id,说明是edge与center失联时,本地告警规则触发的自愈脚本生成的id
// 为了防止不同边缘机房生成的脚本任务id相同,不上报结果至数据库
if th.Id >= storage.IDINITIAL {
logger.Infof("task[%s] host[%s] done, result:[%v]", th.Id, th.Host, th)
logger.Infof("task[%d] host[%s] done, result:[%v]", th.Id, th.Host, th)
} else {
reports = append(reports, th)
}
Expand Down
2 changes: 1 addition & 1 deletion src/models/task_host_doing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (doing *TaskHostDoing) UnmarshalBinary(data []byte) error {
}

func hostDoingCacheKey(id int64, host string) string {
return fmt.Sprintf("host:doing:%s:%d", host, id)
return fmt.Sprintf("%s:%d", host, id)
}

var (
Expand Down
2 changes: 1 addition & 1 deletion src/models/task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (taskMeta *TaskMeta) Cache(host string) error {

tx := storage.Cache.TxPipeline()
tx.Set(ctx, taskMetaCacheKey(taskMeta.Id), taskMeta, storage.DEFAULT)
tx.Set(ctx, hostDoingCacheKey(taskMeta.Id, host), &TaskHostDoing{
tx.HSet(ctx, IBEX_HOST_DOING, hostDoingCacheKey(taskMeta.Id, host), &TaskHostDoing{
Id: taskMeta.Id,
Host: host,
Clock: time.Now().Unix(),
Expand Down
7 changes: 2 additions & 5 deletions src/server/timer/timer_host_doing.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,8 @@ func cacheHostDoing() error {
}

ctx := context.Background()
keys, err := models.CacheKeyGets(ctx, "host:doing:*")
if err != nil {
logger.Errorf("models.CacheKeyGets fail: %v", err)
}
doingsFromRedis, err := models.CacheRecordGets[models.TaskHostDoing](ctx, keys)

doingsFromRedis, err := models.CacheRecordGets[models.TaskHostDoing](ctx)
if err != nil {
logger.Errorf("models.CacheRecordGets fail: %v", err)
}
Expand Down

0 comments on commit 01c1568

Please sign in to comment.