Skip to content

Commit

Permalink
client: add PurgeTopicsFromClient
Browse files Browse the repository at this point in the history
This allows users to cleanup resources on long lived clients. We now
allow the user to purge resources from the client for topics the user is
no longer interested in ever producing to or consuming from.

This also finally addresses two long standing TODOs in the client which
were only ever relevant if we introduced the concept to lose
assignments.

As well, this also fixes a minor thing where the client would track
regular expressions as topic names (although these were not requested).

Purging while producing has some caveats which are disclosed in the
documentation.

I've tested the purging works by manually checking the entire contents
of the client struct after purging with github.com/davecgh/go-spew/spew.
As well, I've tested producing to a purged topic works properly (which
required looking into why some things were dropped -- sequence numbers!)
and I've tested purging and adding (next commit) while consuming
repeatedly.

Long term it would be beneficial to have integration tests here.

Originally, this method was going to be separated by producing and
consuming (PurgeProduceTopics, PurgeConsumeTopics), but a user likely is
not producing to and consuming from the same topic in the same client,
and if they are and want to purge, odds are they want to purge both
sides. If there is a feature request in the future to separate them, new
APIs can be added.
  • Loading branch information
twmb committed Mar 1, 2022
1 parent 3112662 commit d178e26
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 17 deletions.
41 changes: 41 additions & 0 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,47 @@ func (cl *Client) Ping(ctx context.Context) error {
return lastErr
}

// PurgeTopicsFromClient internally removes all internal information about the
// input topics.
//
// For producing, this clears all knowledge that these topics have ever been
// produced to. Producing to the topic again may result in out of order
// sequence number errors, or, if idempotency is disabled and the sequence
// numbers align, may result in invisibly discarded records at the broker.
// Purging a topic that was previously produced to may be useful to free up
// resources if you are producing to many disparate and short lived topic in
// the lifetime of this client and you do not plan to produce to the topic
// anymore. You may want to flush buffered records before purging if records
// for a topic you are purging are currently in flight.
//
// For consuming, this removes all concept of the topic from being consumed.
// This is different from PauseFetchTopics, which literally pauses the fetching
// of topics but keeps the topic information around for resuming fetching
// later. Purging a topic that was being consumed can be useful if you know the
// topic no longer exists, or if you are consuming via regex and know that some
// previously consumed topics no longer exist, or if you simply do not want to
// ever consume from a topic again. If you are group consuming, this function
// will likely cause a rebalance.
func (cl *Client) PurgeTopicsFromClient(topics ...string) {
if len(topics) == 0 {
return
}
sort.Strings(topics) // for logging in the functions
cl.blockingMetadataFn(func() { // make reasoning about concurrency easier
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
cl.purgeProduceTopics(topics)
}()
go func() {
defer wg.Done()
cl.purgeConsumeTopics(topics)
}()
wg.Wait()
})
}

// Parse broker IP/host and port from a string, using the default Kafka port if
// unspecified. Supported address formats:
//
Expand Down
57 changes: 51 additions & 6 deletions pkg/kgo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,46 @@ func (cl *Client) setOffsets(setOffsets map[string]map[int32]EpochOffset, log bo
}
}

// This is guaranteed to be called in a blocking metadata fn, which ensures
// that metadata does not load the tps we are changing. Basically, we ensure
// everything w.r.t. consuming is at a stand still.
func (cl *Client) purgeConsumeTopics(topics []string) {
c := &cl.consumer

if c.g == nil && c.d == nil {
return
}

purgeAssignments := make(map[string]map[int32]Offset, len(topics))
for _, topic := range topics {
purgeAssignments[topic] = nil
}

c.mu.Lock()
defer c.mu.Unlock()

// The difference for groups is we need to lock the group and there is
// a slight type difference in g.using vs d.using.
if c.g != nil {
c.g.mu.Lock()
defer c.g.mu.Unlock()
c.g.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.g.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.g.using, topic)
delete(c.g.reSeen, topic)
}
c.g.rejoin("rejoin from PurgeFetchTopics")
} else {
c.d.tps.purgeTopics(topics)
c.assignPartitions(purgeAssignments, assignPurgeMatching, c.d.tps, fmt.Sprintf("purge of %v requested", topics))
for _, topic := range topics {
delete(c.d.using, topic)
delete(c.d.reSeen, topic)
}
}
}

// assignHow controls how assignPartitions operates.
type assignHow int8

Expand All @@ -532,6 +572,8 @@ const (
// meaningless / a dummy offset.
assignInvalidateMatching

assignPurgeMatching

// The counterpart to assignInvalidateMatching, assignSetMatching
// resets all matching partitions to the specified offset / epoch.
assignSetMatching
Expand All @@ -545,6 +587,8 @@ func (h assignHow) String() string {
return "unassigning everything"
case assignInvalidateMatching:
return "unassigning any currently assigned matching partition that is in the input"
case assignPurgeMatching:
return "unassigning and purging any partition matching the input topics"
case assignSetMatching:
return "reassigning any currently assigned matching partition to the input"
}
Expand Down Expand Up @@ -595,9 +639,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
}
var session *consumerSession
var loadOffsets listOrEpochLoads
if how == assignInvalidateAll {
tps = nil
}

defer func() {
if session == nil { // if nil, we stopped the session
session = c.startNewSession(tps)
Expand Down Expand Up @@ -633,7 +675,10 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
shouldKeep = false
} else { // invalidateMatching or setMatching
if assignTopic, ok := assignments[usedCursor.topic]; ok {
if assignPart, ok := assignTopic[usedCursor.partition]; ok {
if how == assignPurgeMatching { // topic level
usedCursor.source.removeCursor(usedCursor)
shouldKeep = false
} else if assignPart, ok := assignTopic[usedCursor.partition]; ok {
if how == assignInvalidateMatching {
usedCursor.unset()
shouldKeep = false
Expand Down Expand Up @@ -669,7 +714,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
case assignSetMatching:
// We had not yet loaded this partition, so there is
// nothing to set, and we keep everything.
case assignInvalidateMatching:
case assignInvalidateMatching, assignPurgeMatching:
loadOffsets.keepFilter(func(t string, p int32) bool {
if assignTopic, ok := assignments[t]; ok {
if _, ok := assignTopic[p]; ok {
Expand All @@ -683,7 +728,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how

// This assignment could contain nothing (for the purposes of
// invalidating active fetches), so we only do this if needed.
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignSetMatching {
if len(assignments) == 0 || how == assignInvalidateMatching || how == assignPurgeMatching || how == assignSetMatching {
return
}

Expand Down
39 changes: 28 additions & 11 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ type groupConsumer struct {
// autocommit does not cancel the user's manual commit.
blockAuto bool

// We set this once to manage the group lifecycle once.
managing bool

dying bool // set when closing, read in findNewAssignments
}

Expand Down Expand Up @@ -222,7 +225,7 @@ func (c *consumer) initGroup() {
// For non-regex topics, we explicitly ensure they exist for loading
// metadata. This is of no impact if we are *also* consuming via regex,
// but that is no problem.
if len(g.cfg.topics) > 0 {
if len(g.cfg.topics) > 0 && !g.cfg.regex {
topics := make([]string, 0, len(g.cfg.topics))
for topic := range g.cfg.topics {
topics = append(topics, topic)
Expand Down Expand Up @@ -341,7 +344,7 @@ func (g *groupConsumer) leave() (wait func()) {
g.mu.Lock()
wasDead := g.dying
g.dying = true
wasManaging := len(g.using) > 0
wasManaging := g.managing
g.mu.Unlock()

done := make(chan struct{})
Expand Down Expand Up @@ -454,8 +457,7 @@ const (
// (1) if revoking lost partitions from a prior session (i.e., after sync),
// this revokes the passed in lost
// (2) if revoking at the end of a session, this revokes topics that the
// consumer is no longer interested in consuming (TODO, actually, only
// once we allow subscriptions to change without leaving the group).
// consumer is no longer interested in consuming
//
// Lastly, for cooperative consumers, this must selectively delete what was
// lost from the uncommitted map.
Expand Down Expand Up @@ -498,11 +500,24 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi

case revokeThisSession:
// lost is nil for cooperative assigning. Instead, we determine
// lost by finding subscriptions we are no longer interested in.
// lost by finding subscriptions we are no longer interested
// in. This would be from a user's PurgeConsumeTopics call.
//
// TODO only relevant when we allow reassigning with the same
// group to change subscriptions (also we must delete the
// unused partitions from nowAssigned).
// We just paused metadata, but purging triggers a rebalance
// which causes a new metadata request -- in short, this could
// be concurrent with a metadata findNewAssignments, so we
// lock.
g.mu.Lock()
for topic, partitions := range g.nowAssigned {
if _, exists := g.using[topic]; !exists {
if lost == nil {
lost = make(map[string][]int32)
}
lost[topic] = partitions
delete(g.nowAssigned, topic)
}
}
g.mu.Unlock()
}

if len(lost) > 0 {
Expand Down Expand Up @@ -547,7 +562,9 @@ func (g *groupConsumer) revoke(stage revokeStage, lost map[string][]int32, leavi
return
}

defer g.rejoin("cooperative rejoin after revoking what we lost") // cooperative consumers rejoin after they revoking what they lost
if stage != revokeThisSession { // cooperative consumers rejoin after they revoking what they lost
defer g.rejoin("cooperative rejoin after revoking what we lost from a rebalance")
}

// The block below deletes everything lost from our uncommitted map.
// All commits should be **completed** by the time this runs. An async
Expand Down Expand Up @@ -1389,12 +1406,12 @@ func (g *groupConsumer) findNewAssignments() {
return
}

wasManaging := len(g.using) != 0
for topic, change := range toChange {
g.using[topic] += change.delta
}

if !wasManaging {
if !g.managing {
g.managing = true
go g.manage()
return
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ var (
// Returned when trying to produce a record outside of a transaction.
errNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction")

// Returned for all buffered produce records when a user purges topics.
errPurged = errors.New("topic purged while buffered")

//////////////
// EXTERNAL //
//////////////
Expand Down
54 changes: 54 additions & 0 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,60 @@ func (p *producer) init(cl *Client) {
})
}

func (cl *Client) purgeProduceTopics(topics []string) {
p := &cl.producer

p.topicsMu.Lock()
defer p.topicsMu.Unlock()

p.unknownTopicsMu.Lock()
for _, topic := range topics {
if unknown, exists := p.unknownTopics[topic]; exists {
delete(p.unknownTopics, topic)
close(unknown.wait)
cl.failUnknownTopicRecords(unknown, errPurged)
}
}
p.unknownTopicsMu.Unlock()

toStore := p.topics.clone()
defer p.topics.storeData(toStore)

for _, topic := range topics {
d := toStore.loadTopic(topic)
if d == nil {
continue
}
delete(toStore, topic)
for _, p := range d.partitions {
r := p.records

// First we set purged, so that anything in the process
// of being buffered will immediately fail when it goes
// to buffer.
r.mu.Lock()
r.purged = true
r.mu.Unlock()

// Now we remove from the sink. When we do, the recBuf
// is effectively abandonded. Any active produces may
// finish before we fail the records; if they finish
// after they will no longer belong in the batch, but
// they may have been produced. This is the duplicate
// risk a user runs when purging.
r.sink.removeRecBuf(r)

// Once abandonded, we now need to fail anything that
// was buffered.
go func() {
r.mu.Lock()
defer r.mu.Unlock()
r.failAllRecords(errPurged)
}()
}
}
}

func (p *producer) isAborting() bool { return atomic.LoadInt32(&p.aborting) > 0 }

func noPromise(*Record, error) {}
Expand Down
9 changes: 9 additions & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,10 @@ type recBuf struct {
//
// It is always cleared on metadata update.
failing bool

// Only possibly set in PurgeTopics, this is used to fail anything that
// was in the process of being buffered.
purged bool
}

// bufferRecord usually buffers a record, but does not if abortOnNewBatch is
Expand All @@ -1052,6 +1056,11 @@ func (recBuf *recBuf) bufferRecord(pr promisedRec, abortOnNewBatch bool) bool {
// (see Shopify/sarama#1455)
pr.Timestamp = time.Now().Truncate(time.Millisecond)

if recBuf.purged {
recBuf.cl.finishRecordPromise(pr, errPurged)
return true
}

var (
newBatch = true
onDrainBatch = recBuf.batchDrainIdx == len(recBuf.batches)
Expand Down
18 changes: 18 additions & 0 deletions pkg/kgo/topics_and_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ func (t *topicsPartitions) ensureTopics(topics []string) topicsPartitionsData {
return current
}

// Opposite of ensureTopics, this purges the input topics and *does* store.
func (t *topicsPartitions) purgeTopics(topics []string) {
var cloned bool
current := t.load()
for _, topic := range topics {
if _, exists := current[topic]; exists {
if !cloned {
current = t.clone()
cloned = true
}
delete(current, topic)
}
}
if cloned {
t.storeData(current)
}
}

// Updates the topic partitions data atomic value.
//
// If this is the first time seeing partitions, we do processing of unknown
Expand Down

0 comments on commit d178e26

Please sign in to comment.