-
Notifications
You must be signed in to change notification settings - Fork 659
/
idempotent_producer_example.go
179 lines (150 loc) · 4.88 KB
/
idempotent_producer_example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/**
* Copyright 2019 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Idempotent Producer example.
//
// The idempotent producer provides strict ordering and
// exactly-once producing guarantees.
//
// From the application developer's perspective, the only difference
// from a standard producer is the enabling of the feature by setting
// the `enable.idempotence` configuration property to true, and
// handling fatal errors (Error.IsFatal()) which are raised when the
// idempotent guarantees can't be satisfied.
package main
import (
"fmt"
"os"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
var run = true
func main() {
if len(os.Args) != 3 {
fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <topic>\n",
os.Args[0])
os.Exit(1)
}
bootstrapServers := os.Args[1]
topic := os.Args[2]
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
// Enable the Idempotent Producer
"enable.idempotence": true})
if err != nil {
fmt.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)
// Go routine for serving the events channel for delivery reports and error events.
go func() {
doTerm := false
for !doTerm {
select {
case e := <-p.Events():
switch ev := e.(type) {
case *kafka.Message:
// Message delivery report
m := ev
if m.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
case kafka.Error:
// Generic client instance-level errors, such as
// broker connection failures, authentication issues, etc.
//
// These errors should generally be considered informational
// as the underlying client will automatically try to
// recover from any errors encountered, the application
// does not need to take action on them.
//
// But with idempotence enabled, truly fatal errors can
// be raised when the idempotence guarantees can't be
// satisfied, these errors are identified by
// `e.IsFatal()`.
e := ev
if e.IsFatal() {
// Fatal error handling.
//
// When a fatal error is detected by the producer
// instance, it will emit kafka.Error event (with
// IsFatal()) set on the Events channel.
//
// Note:
// After a fatal error has been raised, any
// subsequent Produce*() calls will fail with
// the original error code.
fmt.Printf("FATAL ERROR: %v: terminating\n", e)
run = false
} else {
fmt.Printf("Error: %v\n", e)
}
default:
fmt.Printf("Ignored event: %s\n", ev)
}
case <-termChan:
doTerm = true
}
}
close(doneChan)
}()
msgcnt := 0
for run == true {
value := fmt.Sprintf("Go Idempotent Producer example, message #%d", msgcnt)
// Produce message.
// This is an asynchronous call, on success it will only
// enqueue the message on the internal producer queue.
// The actual delivery attempts to the broker are handled
// by background threads.
// Per-message delivery reports are emitted on the Events() channel,
// see the go-routine above.
err = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}, nil)
if err != nil {
fmt.Printf("Failed to produce message: %v\n", err)
}
msgcnt++
// Since fatal errors can't be triggered in practice,
// use the test API to trigger a fabricated error after some time.
if msgcnt == 13 {
p.TestFatalError(kafka.ErrOutOfOrderSequenceNumber, "Testing fatal errors")
}
time.Sleep(500 * time.Millisecond)
}
// Clean termination to get delivery results
// for all outstanding/in-transit/queued messages.
fmt.Printf("Flushing outstanding messages\n")
p.Flush(15 * 1000)
// signal termination to go-routine
termChan <- true
// wait for go-routine to terminate
<-doneChan
fatalErr := p.GetFatalError()
p.Close()
// Exit application with an error (1) if there was a fatal error.
if fatalErr != nil {
os.Exit(1)
} else {
os.Exit(0)
}
}