Skip to content

topic-message queue which depends on memory/redis/kafka

Notifications You must be signed in to change notification settings

guanhg/topic-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

topic-message queue

生产者(Producer)和消费者(Consumer)发送/消费批量消息(batch message)

  • memory模式 消息队列存储在内存中

    1. 当队列中消息数量满足batchSize(默认10)时,被发送/消费
    2. 当队列中的数据缓存超时时,被强制发送/消费
  • redis模式 消息被持久化到redis list中,每个主题一个list

    1. 生产者消息被暂时缓存,满足条件时批量发送,减少redis网络请求
    2. 消费者缓存消息,满足条件时批量消费 每次最多获取batchSize个消息,如果没有消费完,阻塞获取请求
  • kafka模式 kafka接口的简单包装

Example

// new queue
// memory
q := NewMQueue(WithBatchSize(5))

// redis
// client := rds.NewClient(&rds.Options{Addr: "localhost:6379"})
// q := NewRdsQueue(client, WithBatchSize(5))

// kafka
// client, err := sarama.NewClient([]string{"127.0.0.1:9092"}, nil)
// q := NewKfQueue(client, WithBatchSize(5))

topicName := "belike"
q.AddTopic(topicName, func(topicName string, batch batchEntry) error {
    // consume handle
    for _, e := range batch.entries {
        var i int
        if err := json.Unmarshal(e.Data.([]byte), &i); err != nil {
            t.Error(err)
        }
        fmt.Printf("%s: %d \n", e.Key, i)
    }
    fmt.Println("batchSize: ", len(batch.entries))
    return nil
}, stdHandleErr)

for i := 0; i < 46; i++ {
    q.Produce(topicName, Entry{Key: "kmsg", Data: time.Now().Nanosecond()})
}

timer := time.NewTimer(time.Second * 3)
<-timer.C

q.Wait()

About

topic-message queue which depends on memory/redis/kafka

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages