Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用WaitGroup, Channel和Context打造一个并发用户标签查询器 #21

Open
kevinyan815 opened this issue Oct 29, 2020 · 0 comments

Comments

@kevinyan815
Copy link
Owner

kevinyan815 commented Oct 29, 2020

这个例子是使用WaitGroup编排多个goroutine并发完成用户多个标签数据的查询工作。至于每个标签应该怎么查询则由具体的标签查询实现类型实现Picker接口的PickTagValueForUser方法,在方法里实现自己标签的查询逻辑。

下面的Picker接口约定了每个标签查询器必须要实现的两个方法

type Picker interface {
	// 用于查询用户的标签值
	PickTagValueForUser (userId int64, args ... interface{})
	// 通知查询到的标签值
	Notify () <-chan interface{}
}

resolveTagPicker函数用于通过标签名解析出每个对应标签查询器对象。

// 根据标签名解析出对应的TagPicker
func resolveTagPicker (tagName string) Picker {
	switch tagName {
	case TAG_ORDER_NUM: // 这是个常量 值是字符串order_num 
		return &OrderNumTagPicker{
			TagName: tagName,
			ValueCh: make(chan interface{}),
		}
	default:
		return nil
	}
}

下面是对外提供的对多个用户标签进行并发查询的查询方法,BulkQueryUserTagValue会根据要查询的多个用户标签每个都开启一个goroutine 执行queryTagValue方法。 queryTagValue方法就是每个标签查询器执行的goroutine,它会根据标签标识实例化出相应的标签查询器,然后再开启一个goroutine执行标签查询器实现的PickTagValueForUser方法,查询到后会通过标签查询器的Channel获得标签值。这里再开一个goroutine去执行PickTagValueForUser方法的原因是要做好查询器的超时处理。
queryTagValue会同时接收标签查询器查询到的结果值(通过Notify方法返回的Channel)和ctx.Done() 这个Channel。 如果在ctx.Done()通道接收到值时还没有从查询器的Channel接收到标签值,则视为超时。

// 对外提供的批量查询用户标签值的方法
func BulkQueryUserTagValue(tagNames []string, userId int64, queryArgs ...interface{}) (tagValuePairs []*TagValuePair) {
	tagCount := len(tagNames)
	if tagCount < 1 {
		return
	}
	wg := &sync.WaitGroup{}
	wg.Add(tagCount)
	tagValueCh := make(chan *TagValuePair, tagCount) // 用于接收所有Picker查到的标签值的Channel
	ctx, _ := context.WithTimeout(context.Background(), time.Minute) // 设置执行标签值查找的超时时间
	for _, tagName := range tagNames {
		go queryTagValue(ctx, wg, tagName, userId, tagValueCh, queryArgs...)
	}
	wg.Wait()
	close(tagValueCh) // 先关闭通道 方便下面for range不发生阻塞, 从channel中读完值即退出
	tagValuePairs = make([]*TagValuePair, 0)
	for tagValue := range tagValueCh {
		if tagValue.Value != nil {
			tagValuePairs = append(tagValuePairs, tagValue)
		}
	}

	return tagValuePairs
}

type TagValuePair struct {
	Name string `json:"tag_name"`
	Value interface{} `json:"tag_value"`
}


func queryTagValue(ctx context.Context, wg *sync.WaitGroup, tagName string, userId int64, tagValueCh chan *TagValuePair, queryArgs ...interface{}) {
	defer wg.Done()
	tagPicker := resolveTagPicker(tagName)
	if tagPicker == nil {
		dlog.Error("未识别的业务标签", common.ErrUnknownBusinessTag)
		return
	}
	go tagPicker.PickTagValueForUser(userId, queryArgs...)
	select {
	case <- ctx.Done(): // 超时返回
		return
	case tagValue := <- tagPicker.Notify(): // 接收标签值
		TagValuePair := &TagValuePair{
			Name:  tagName,
			Value: tagValue,
		}
		tagValueCh <- TagValuePair

		return
	}
}

最后就是具体标签查询器的实现了,每个标签都有自己的实现逻辑,下面是OrderNumTagPicker的示例代码:

type OrderNumTagPicker struct {
	TagName string
	ValueCh chan interface{}
}

type TradeNoInfo struct {
	TradeNo string `json:"trade_no"`
}

func (picker *OrderNumTagPicker) PickTagValueForUser(userId int64, args ...interface{}) {
	// 用类型转换得到交易号
       	// tradeNo, ok := args[0].(string)
        extInfoJson, _ := args[0].(string)

	if err := json.Unmarshal([]byte(extInfoJson), &TradeNoInfo); err != nil {
                // log.Error自己实现 
		log.Error("PayTotalTagPickerError", "Invalid arg", args[0])
		// 结束执行并通知外部
		picker.ValueCh <- nil
		return
	}
        // 这里就打印下参数值,标签查询的具体逻辑自己实现
        fmt.Println(userId)
        fmt.Println(TradeNo)
	// 查询到的用户的标签 (假设交易号下有10个订单)
	picker.ValueCh <- 10
}

func (picker *OrderNumTagPicker) Notify () <-chan interface{} {
	return picker.ValueCh
}

完整的源代码参考: https://github.com/kevinyan815/gocookbook/tree/master/codes/tag_picker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant