Skip to content

Commit

Permalink
Merge pull request #16 from zhoushuguang/article-five
Browse files Browse the repository at this point in the history
article five
  • Loading branch information
zhoushuguang authored Jun 17, 2022
2 parents 3c8fcc0 + 7da7a44 commit f0c5955
Show file tree
Hide file tree
Showing 20 changed files with 642 additions and 71 deletions.
9 changes: 6 additions & 3 deletions apps/product/rpc/etc/product.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
Name: product.rpc
ListenOn: 127.0.0.1:8081
Mode: dev
Etcd:
Hosts:
- 127.0.0.1:2379
Key: product.rpc
DataSource: root:123456@tcp(127.0.0.1:3306)/product
DataSource: root:123456@tcp(127.0.0.1:3306)/product?parseTime=true
CacheRedis:
- Host: 127.0.0.1:6379
Pass:
Type: node
Telemetry:
Endpoint: http://127.0.0.1:14268/api/traces
BizRedis:
Host: 127.0.0.1:6379
Pass:
Type: node
2 changes: 2 additions & 0 deletions apps/product/rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package config

import (
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/zrpc"
)

type Config struct {
zrpc.RpcServerConf
DataSource string
CacheRedis cache.CacheConf
BizRedis redis.RedisConf
}
189 changes: 189 additions & 0 deletions apps/product/rpc/internal/logic/productlistlogic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package logic

import (
"context"
"fmt"
"strconv"
"time"

"github.com/zhoushuguang/lebron/apps/product/rpc/internal/model"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/svc"
"github.com/zhoushuguang/lebron/apps/product/rpc/product"

"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
"github.com/zeromicro/go-zero/core/threading"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
defaultPageSize = 10
defaultLimit = 300
expireTime = 3600 * 24 * 3
)

type ProductListLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}

func NewProductListLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ProductListLogic {
return &ProductListLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}

func (l *ProductListLogic) ProductList(in *product.ProductListRequest) (*product.ProductListResponse, error) {
_, err := l.svcCtx.CategoryModel.FindOne(l.ctx, int64(in.CategoryId))
if err == model.ErrNotFound {
return nil, status.Error(codes.NotFound, "category not found")
}
if in.Cursor == 0 {
in.Cursor = time.Now().Unix()
}
if in.Ps == 0 {
in.Ps = defaultPageSize
}
var (
isCache, isEnd bool
lastID, lastTime int64
firstPage []*product.ProductItem
products []*model.Product
)
pids, _ := l.cacheProductList(l.ctx, in.CategoryId, in.Cursor, int64(in.Ps))
if len(pids) == int(in.Ps) {
isCache = true
if pids[len(pids)-1] == -1 {
isEnd = true
}
products, err := l.productsByIds(l.ctx, pids)
if err != nil {
return nil, err
}
for _, p := range products {
firstPage = append(firstPage, &product.ProductItem{
ProductId: p.Id,
Name: p.Name,
CreateTime: p.CreateTime.Unix(),
})
}
} else {
var (
err error
ctime = time.Unix(in.Cursor, 0).Format("2006-01-02 15:04:05")
)
products, err = l.svcCtx.ProductModel.CategoryProducts(l.ctx, ctime, int64(in.CategoryId), defaultLimit)
if err != nil {
return nil, err
}
var firstPageProducts []*model.Product
if len(products) > int(in.Ps) {
firstPageProducts = products[:int(in.Ps)]
} else {
firstPageProducts = products
isEnd = true
}
for _, p := range firstPageProducts {
firstPage = append(firstPage, &product.ProductItem{
ProductId: p.Id,
Name: p.Name,
CreateTime: p.CreateTime.Unix(),
})
}
}
if len(firstPage) > 0 {
pageLast := firstPage[len(firstPage)-1]
lastID = pageLast.ProductId
lastTime = pageLast.CreateTime
if lastTime < 0 {
lastTime = 0
}
for k, p := range firstPage {
if p.CreateTime == in.Cursor && p.ProductId == in.ProductId {
firstPage = firstPage[k:]
break
}
}
}
ret := &product.ProductListResponse{
IsEnd: isEnd,
Timestamp: lastTime,
ProductId: lastID,
Products: firstPage,
}
if !isCache {
threading.GoSafe(func() {
if len(products) < defaultLimit && len(products) > 0 {
endTime, _ := time.Parse("2006-01-02 15:04:05", "0000-00-00 00:00:00")
products = append(products, &model.Product{Id: -1, CreateTime: endTime})
}
_ = l.addCacheProductList(context.Background(), products)
})
}
return ret, nil
}

func (l *ProductListLogic) productsByIds(ctx context.Context, pids []int64) ([]*model.Product, error) {
products, err := mr.MapReduce(func(source chan<- interface{}) {
for _, pid := range pids {
source <- pid
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
pid := item.(int64)
p, err := l.svcCtx.ProductModel.FindOne(ctx, pid)
if err != nil {
cancel(err)
return
}
writer.Write(p)
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var ps []*model.Product
for item := range pipe {
p := item.(*model.Product)
ps = append(ps, p)
}
writer.Write(ps)
})
if err != nil {
return nil, err
}
return products.([]*model.Product), nil
}

func (l *ProductListLogic) cacheProductList(ctx context.Context, cid int32, cursor, ps int64) ([]int64, error) {
pairs, err := l.svcCtx.BizRedis.ZrevrangebyscoreWithScoresAndLimitCtx(ctx, categoryKey(cid), cursor, 0, 0, int(ps))
if err != nil {
return nil, err
}
var ids []int64
for _, pair := range pairs {
id, _ := strconv.ParseInt(pair.Key, 10, 64)
ids = append(ids, id)
}
return ids, nil
}

func (l *ProductListLogic) addCacheProductList(ctx context.Context, products []*model.Product) error {
if len(products) == 0 {
return nil
}
for _, p := range products {
score := p.CreateTime.Unix()
if score < 0 {
score = 0
}
_, err := l.svcCtx.BizRedis.ZaddCtx(ctx, categoryKey(int32(p.Cateid)), score, strconv.Itoa(int(p.Id)))
if err != nil {
return err
}
}
return l.svcCtx.BizRedis.ExpireCtx(ctx, categoryKey(int32(products[0].Cateid)), expireTime)
}

func categoryKey(cid int32) string {
return fmt.Sprintf("category:%d", cid)
}
7 changes: 6 additions & 1 deletion apps/product/rpc/internal/logic/productlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package logic

import (
"context"
"fmt"

"github.com/zhoushuguang/lebron/apps/product/rpc/internal/model"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/svc"
"github.com/zhoushuguang/lebron/apps/product/rpc/product"

Expand All @@ -24,10 +26,13 @@ func NewProductLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ProductLo
}

func (l *ProductLogic) Product(in *product.ProductItemRequest) (*product.ProductItem, error) {
p, err := l.svcCtx.ProductModel.FindOne(l.ctx, in.ProductId)
v, err, _ := l.svcCtx.SingleGroup.Do(fmt.Sprintf("product:%d", in.ProductId), func() (interface{}, error) {
return l.svcCtx.ProductModel.FindOne(l.ctx, in.ProductId)
})
if err != nil {
return nil, err
}
p := v.(*model.Product)
return &product.ProductItem{
ProductId: p.Id,
Name: p.Name,
Expand Down
2 changes: 1 addition & 1 deletion apps/product/rpc/internal/logic/productslogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package logic

import (
"context"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/model"
"strconv"
"strings"

"github.com/zhoushuguang/lebron/apps/product/rpc/internal/model"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/svc"
"github.com/zhoushuguang/lebron/apps/product/rpc/product"

Expand Down
12 changes: 12 additions & 0 deletions apps/product/rpc/internal/model/productmodel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package model

import (
"context"
"fmt"
"github.com/zeromicro/go-zero/core/stores/cache"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
Expand All @@ -12,6 +14,7 @@ type (
// and implement the added methods in customProductModel.
ProductModel interface {
productModel
CategoryProducts(ctx context.Context, ctime string, cateid, limit int64) ([]*Product, error)
}

customProductModel struct {
Expand All @@ -25,3 +28,12 @@ func NewProductModel(conn sqlx.SqlConn, c cache.CacheConf) ProductModel {
defaultProductModel: newProductModel(conn, c),
}
}

func (m *customProductModel) CategoryProducts(ctx context.Context, ctime string, cateid, limit int64) ([]*Product, error) {
var products []*Product
err := m.QueryRowsNoCacheCtx(ctx, &products, fmt.Sprintf("select %s from %s where cateid=? and status=1 and create_time<? order by create_time desc limit ?", productRows, m.table), cateid, ctime, limit)
if err != nil {
return nil, err
}
return products, nil
}
22 changes: 11 additions & 11 deletions apps/product/rpc/internal/model/productmodel_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions apps/product/rpc/internal/server/productserver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 11 additions & 5 deletions apps/product/rpc/internal/svc/servicecontext.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
package svc

import (
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/config"
"github.com/zhoushuguang/lebron/apps/product/rpc/internal/model"

"github.com/zeromicro/go-zero/core/stores/redis"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"golang.org/x/sync/singleflight"
)

type ServiceContext struct {
Config config.Config
ProductModel model.ProductModel
Config config.Config
ProductModel model.ProductModel
CategoryModel model.CategoryModel
BizRedis *redis.Redis
SingleGroup singleflight.Group
}

func NewServiceContext(c config.Config) *ServiceContext {
conn := sqlx.NewMysql(c.DataSource)
return &ServiceContext{
Config: c,
ProductModel: model.NewProductModel(conn, c.CacheRedis),
Config: c,
ProductModel: model.NewProductModel(conn, c.CacheRedis),
CategoryModel: model.NewCategoryModel(conn, c.CacheRedis),
BizRedis: redis.New(c.BizRedis.Host, redis.WithPass(c.BizRedis.Pass)),
}
}
1 change: 0 additions & 1 deletion apps/product/rpc/product.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func main() {

s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
product.RegisterProductServer(grpcServer, svr)

if c.Mode == service.DevMode || c.Mode == service.TestMode {
reflection.Register(grpcServer)
}
Expand Down
Loading

0 comments on commit f0c5955

Please sign in to comment.