-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to control the message consumption #1938
Comments
When you call Consume on your consumer group instance, you pass a parent context.Context which will be passed through to the session and can be used to interrupt it and stop consumption: So you'd cancel that context to break out of the existing consumption and not call |
thanks for the clarification @dnwe |
Glad I could help! |
Hi @dnwe , Click to show the codepackage main
// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
)
type ConsumptionControl struct {
isAllowed bool
controller sync.Cond
cancelFunc context.CancelFunc
}
// Sarama configuration options
var (
brokers = ""
version = ""
group = ""
topics = ""
assignor = ""
oldest = true
verbose = false
)
func init() {
flag.StringVar(&brokers, "brokers", "", "Kafka bootstrap brokers to connect to, as a comma separated list")
flag.StringVar(&group, "group", "", "Kafka consumer group definition")
flag.StringVar(&version, "version", "2.1.1", "Kafka cluster version")
flag.StringVar(&topics, "topics", "", "Kafka topics to be consumed, as a comma separated list")
flag.StringVar(&assignor, "assignor", "range", "Consumer group partition assignment strategy (range, roundrobin, sticky)")
flag.BoolVar(&oldest, "oldest", true, "Kafka consumer consume initial offset from oldest")
flag.BoolVar(&verbose, "verbose", false, "Sarama logging")
flag.Parse()
if len(brokers) == 0 {
panic("no Kafka bootstrap brokers defined, please set the -brokers flag")
}
if len(topics) == 0 {
panic("no topics given to be consumed, please set the -topics flag")
}
if len(group) == 0 {
panic("no Kafka consumer group defined, please set the -group flag")
}
}
func main() {
keepRunning := true
log.Println("Starting a new Sarama consumer")
if verbose {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
}
version, err := sarama.ParseKafkaVersion(version)
if err != nil {
log.Panicf("Error parsing Kafka version: %v", err)
}
/**
* Construct a new Sarama configuration.
* The Kafka cluster version has to be defined before the consumer/producer is initialized.
*/
config := sarama.NewConfig()
config.Version = version
switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
/**
* Setup a new Sarama consumer group
*/
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancelConsumption := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
go func() {
for err := range client.Errors() {
fmt.Println("Error: ", err)
}
fmt.Println("<<<<<<<<<<<<<<<<<<<<<<<<< exiting error routine ")
}()
consumptionController := &ConsumptionControl{
isAllowed: true,
controller: *sync.NewCond(&sync.Mutex{}),
cancelFunc: cancelConsumption,
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for keepRunning {
fmt.Println(">>>>> before consume")
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
log.Panicf("Error from consumer: %v", err)
}
fmt.Println("<<<<< after consume")
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
fmt.Println("ctx err:", ctx.Err().Error())
return
}
fmt.Println("checking consumption allowance")
consumptionController.controller.L.Lock()
for !consumptionController.isAllowed {
fmt.Println("[!!!] pausing the consumption")
consumptionController.controller.Wait()
fmt.Println("[!!!] resuming the consumption")
}
consumptionController.controller.L.Unlock()
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
case <-sigusr1:
ctx = toggleConsumptionFlow(ctx, consumptionController)
}
}
cancelConsumption()
// free consumer routine if it is blocked by the consumption control
consumptionController.controller.L.Lock()
consumptionController.isAllowed = true
consumptionController.controller.Broadcast()
consumptionController.controller.L.Unlock()
fmt.Println("waiting consumer")
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
func toggleConsumptionFlow(ctx context.Context, consumptionController *ConsumptionControl) context.Context {
consumptionController.controller.L.Lock()
defer consumptionController.controller.L.Unlock()
if consumptionController.isAllowed {
consumptionController.isAllowed = !consumptionController.isAllowed
consumptionController.cancelFunc()
ctx, consumptionController.cancelFunc = context.WithCancel(context.Background())
} else {
consumptionController.isAllowed = !consumptionController.isAllowed
consumptionController.controller.Broadcast()
}
fmt.Println("toggled consumption to ", consumptionController.isAllowed)
return ctx
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(session sarama.ConsumerGroupSession) error {
fmt.Println("### SETUP. Claims:", session.Claims())
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(session sarama.ConsumerGroupSession) error {
fmt.Println("### CLEANUP. Claims:", session.Claims())
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
fmt.Println(">>>>> consume claim", claim.Topic(), claim.Partition())
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
log.Printf("Message claimed[t: %s p: %d, o: %d]: key = %s value = %s, timestamp = %v",
message.Topic, message.Partition, message.Offset,
string(message.Key), string(message.Value), message.Timestamp)
// simulate the processing time
time.Sleep(time.Second)
log.Printf("Processed[t: %s p: %d, o: %d]", message.Topic, message.Partition, message.Offset)
session.MarkMessage(message, "")
}
fmt.Println("<<<<< consume claim", claim.Topic(), claim.Partition())
return nil
} |
On top of what @raulnegreiros said, I think in Java it is also possible to pause consumption only for some partition(s). |
Is there a way to control the consumption flow?
I know that the Kafka Java API has the
pause
andresume
methods, but I don't find something similar with Sarama.Which is the recommended way to pause the consumption while the processing mechanism has some problem (some database connection issue, for example)?
The text was updated successfully, but these errors were encountered: