Skip to content

Commit

Permalink
add support for ListTransactions, minor DescribeTransactions change
Browse files Browse the repository at this point in the history
In describe transactions, the response field should be
TransactionStates, not TransactionalStates. Since this is in an
"unreleased" API, I'm ok with breaking this.

List is defined and handled just like sharded list groups.
  • Loading branch information
twmb committed Mar 20, 2021
1 parent 3e6bcc3 commit b531098
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,5 @@ a protocol is supported by code generation.
- [KIP-590](https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller) (support for forwarding admin requests; 2.7.0)
- [KIP-595](https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum) (new APIs for raft protocol; 2.7.0)
- [KIP-599](https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations) (throttle create/delete topic/partition; 2.7.0)
- [KIP-664](https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions) (describe producers, describe / list transactions; mostly 2.8.0 [write txn markers missing])
- [KIP-700](https://cwiki.apache.org/confluence/display/KAFKA/KIP-700%3A+Add+Describe+Cluster+API) (describe cluster; 2.8.0)
2 changes: 1 addition & 1 deletion generate/definitions/65_describe_transactions
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ DescribeTransactionsRequest => key 65, max version 0, flexible v0+
// DescribeTransactionsResponse is a response to a DescribeTransactionsRequest.
DescribeTransactionsResponse =>
ThrottleMillis
TransactionalStates: [=>]
TransactionStates: [=>]
// A potential error code for describing this transaction.
//
// NOT_COORDINATOR is returned if the broker receiving this transactional
Expand Down
36 changes: 36 additions & 0 deletions generate/definitions/66_list_transactions
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// For KIP-664, ListTransactionsRequest lists transactions.
ListTransactionsRequest => key 66, max version 0, flexible v0+
// The transaction states to filter by: if empty, all transactions are
// returned; if non-empty, then only transactions matching one of the
// filtered states will be returned.
//
// For a list of valid states, see the TransactionState enum.
StateFilters: [string]
// The producer IDs to filter by: if empty, all transactions will be
// returned; if non-empty, only transactions which match one of the filtered
// producer IDs will be returned
ProducerIDFilters: [int64]

// ListTransactionsResponse is a response to a ListTransactionsRequest.
ListTransactionsResponse =>
ThrottleMillis
// A potential error code for the listing,
//
// COORDINATOR_LOAD_IN_PROGRESS is returned if the coordinator is loading.
//
// COORDINATOR_NOT_AVAILABLE is returned if the coordinator receiving this
// request is shutting down.
ErrorCode: int16
// Set of state filters provided in the request which were unknown to the
// transaction coordinator.
UnknownStateFilters: [string]
// TransactionStates contains all transactions that were matched for listing
// in the request. The response elides transactions that the user does not have
// permission to describe (DESCRIBE on TRANSACTIONAL_ID for the transaction).
TransactionStates: [=>]
// The transactional ID being used.
TransactionalID: string
// The producer ID of the producer.
ProducerID: int64
// The current transaction state of the producer.
TransactionState: string
50 changes: 46 additions & 4 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ func (cl *Client) Close() {
// IncrementalAlterConfigs
// DescribeProducers
// DescribeTransactions
// ListTransactions
//
// In short, this method tries to do the correct thing depending on what type
// of request is being issued.
Expand Down Expand Up @@ -661,7 +662,8 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
*kmsg.DeleteGroupsRequest, // key 42
*kmsg.IncrementalAlterConfigsRequest, // key 44
*kmsg.DescribeProducersRequest, // key 61
*kmsg.DescribeTransactionsRequest: // key 65
*kmsg.DescribeTransactionsRequest, // key 65
*kmsg.ListTransactionsRequest: // key 66
return cl.handleShardedReq(ctx, req)
}

Expand Down Expand Up @@ -1249,6 +1251,8 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res
sharder = &describeProducersSharder{cl}
case *kmsg.DescribeTransactionsRequest:
sharder = &describeTransactionsSharder{cl}
case *kmsg.ListTransactionsRequest:
sharder = &listTransactionsSharder{cl}
}

// If a request fails, we re-shard it (in case it needs to be split
Expand Down Expand Up @@ -2384,8 +2388,8 @@ func (cl *describeTransactionsSharder) shard(ctx context.Context, kreq kmsg.Requ

func (cl *describeTransactionsSharder) onResp(kresp kmsg.Response) { // cleanup any stale coordinators
resp := kresp.(*kmsg.DescribeTransactionsResponse)
for i := range resp.TransactionalStates {
txnState := &resp.TransactionalStates[i]
for i := range resp.TransactionStates {
txnState := &resp.TransactionStates[i]
err := kerr.ErrorForCode(txnState.ErrorCode)
cl.maybeDeleteStaleCoordinator(txnState.TransactionalID, coordinatorTypeTxn, err)
}
Expand All @@ -2398,6 +2402,44 @@ func (cl *describeTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Respo
resp := kresp.(*kmsg.DescribeTransactionsResponse)
merged.Version = resp.Version
merged.ThrottleMillis = resp.ThrottleMillis
merged.TransactionalStates = append(merged.TransactionalStates, resp.TransactionalStates...)
merged.TransactionStates = append(merged.TransactionStates, resp.TransactionStates...)
})
}

// handles sharding ListTransactionsRequest
type listTransactionsSharder struct{ *Client }

func (cl *listTransactionsSharder) shard(ctx context.Context, kreq kmsg.Request) ([]issueShard, bool, error) {
req := kreq.(*kmsg.ListTransactionsRequest)
return cl.allBrokersShardedReq(ctx, func() kmsg.Request {
dup := *req
return &dup
})
}

func (cl *listTransactionsSharder) onResp(kresp kmsg.Response) {} // nothing to do

func (cl *listTransactionsSharder) merge(sresps []ResponseShard) (kmsg.Response, error) {
merged := new(kmsg.ListTransactionsResponse)

unknownStates := make(map[string]struct{})

firstErr := firstErrMerger(sresps, func(kresp kmsg.Response) {
resp := kresp.(*kmsg.ListTransactionsResponse)
merged.Version = resp.Version
merged.ThrottleMillis = resp.ThrottleMillis
if merged.ErrorCode == 0 {
merged.ErrorCode = resp.ErrorCode
}
for _, state := range resp.UnknownStateFilters {
unknownStates[state] = struct{}{}
}
merged.TransactionStates = append(merged.TransactionStates, resp.TransactionStates...)
})
for unknownState := range unknownStates {
merged.UnknownStateFilters = append(merged.UnknownStateFilters, unknownState)
}

return merged, firstErr

}
Loading

0 comments on commit b531098

Please sign in to comment.