2
2
#include < csp/adapters/kafka/KafkaConsumer.h>
3
3
#include < csp/adapters/kafka/KafkaPublisher.h>
4
4
#include < csp/adapters/kafka/KafkaSubscriber.h>
5
- #include < csp/engine/Dictionary.h>
6
5
#include < csp/core/Platform.h>
6
+ #include < csp/engine/Dictionary.h>
7
7
8
8
#include < iostream>
9
9
#include < librdkafka/rdkafkacpp.h>
10
10
11
11
namespace csp
12
12
{
13
13
14
- INIT_CSP_ENUM ( csp::adapters::kafka::KafkaStatusMessageType,
15
- " OK" ,
16
- " MSG_DELIVERY_FAILED" ,
17
- " MSG_SEND_ERROR" ,
18
- " MSG_RECV_ERROR"
19
- );
14
+ INIT_CSP_ENUM ( csp::adapters::kafka::KafkaStatusMessageType, " OK" , " MSG_DELIVERY_FAILED" , " MSG_SEND_ERROR" ,
15
+ " MSG_RECV_ERROR" );
20
16
21
17
}
22
18
@@ -26,59 +22,68 @@ namespace csp::adapters::kafka
26
22
class DeliveryReportCb : public RdKafka ::DeliveryReportCb
27
23
{
28
24
public:
29
- DeliveryReportCb ( KafkaAdapterManager * mgr ) : m_adapterManager( mgr )
25
+ DeliveryReportCb ( KafkaAdapterManager * mgr )
26
+ : m_adapterManager( mgr )
30
27
{
31
28
}
32
29
33
- void dr_cb ( RdKafka::Message &message ) final
30
+ void dr_cb ( RdKafka::Message & message ) final
34
31
{
35
32
/* If message.err() is non-zero the message delivery failed permanently
36
33
* for the message. */
37
34
if ( message.err () )
38
35
{
39
- std::string msg = " KafkaPublisher: Message delivery failed for topic " + message.topic_name () + " . Failure: " + message.errstr ();
40
- m_adapterManager -> pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_DELIVERY_FAILED, msg );
36
+ std::string msg = " KafkaPublisher: Message delivery failed for topic " + message.topic_name ()
37
+ + " . Failure: " + message.errstr ();
38
+ m_adapterManager->pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::MSG_DELIVERY_FAILED, msg );
41
39
}
42
40
}
41
+
43
42
private:
44
43
KafkaAdapterManager * m_adapterManager;
45
44
};
46
45
47
46
class EventCb : public RdKafka ::EventCb
48
47
{
49
48
public:
50
- EventCb ( KafkaAdapterManager * mgr ) : m_adapterManager( mgr ) {}
49
+ EventCb ( KafkaAdapterManager * mgr )
50
+ : m_adapterManager( mgr )
51
+ {
52
+ }
51
53
52
54
void event_cb ( RdKafka::Event & event ) override
53
55
{
54
56
if ( event.type () == RdKafka::Event::EVENT_LOG )
55
57
{
56
58
if ( event.severity () < RdKafka::Event::EVENT_SEVERITY_NOTICE )
57
59
{
58
- std::string errmsg = " KafkaConsumer: error " + RdKafka::err2str ( ( RdKafka::ErrorCode ) event.err () ) + " . Reason: " + event.str ();
59
- m_adapterManager -> pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::GENERIC_ERROR, errmsg );
60
+ std::string errmsg = " KafkaConsumer: error " + RdKafka::err2str ( (RdKafka::ErrorCode)event.err () )
61
+ + " . Reason: " + event.str ();
62
+ m_adapterManager->pushStatus ( StatusLevel::ERROR, KafkaStatusMessageType::GENERIC_ERROR, errmsg );
60
63
}
61
64
}
62
65
else if ( event.type () == RdKafka::Event::EVENT_ERROR )
63
66
{
64
- // We shutdown the app if its a fatal error OR if its an authentication issue which has plagued users multiple times
67
+ // We shutdown the app if its a fatal error OR if its an authentication issue which has plagued users
68
+ // multiple times
65
69
if ( event.fatal () || event.err () == RdKafka::ErrorCode::ERR__AUTHENTICATION )
66
- m_adapterManager -> forceShutdown ( RdKafka::err2str ( ( RdKafka::ErrorCode ) event.err () ) + event.str () );
70
+ m_adapterManager-> forceShutdown ( RdKafka::err2str ( (RdKafka::ErrorCode) event.err () ) + event.str () );
67
71
}
68
72
}
69
73
70
74
private:
71
75
KafkaAdapterManager * m_adapterManager;
72
76
};
73
77
74
- KafkaAdapterManager::KafkaAdapterManager ( csp::Engine * engine, const Dictionary & properties ) : AdapterManager( engine ),
75
- m_consumerIdx ( 0 ),
76
- m_producerPollThreadActive( false )
78
+ KafkaAdapterManager::KafkaAdapterManager ( csp::Engine * engine, const Dictionary & properties )
79
+ : AdapterManager( engine )
80
+ , m_consumerIdx( 0 )
81
+ , m_producerPollThreadActive( false )
77
82
{
78
- m_maxThreads = properties.get <uint64_t >( " max_threads" );
83
+ m_maxThreads = properties.get <uint64_t >( " max_threads" );
79
84
m_pollTimeoutMs = properties.get <TimeDelta>( " poll_timeout" ).asMilliseconds ();
80
85
81
- m_eventCb = std::make_unique<EventCb>( this );
86
+ m_eventCb = std::make_unique<EventCb>( this );
82
87
m_producerCb = std::make_unique<DeliveryReportCb>( this );
83
88
84
89
std::string errstr;
@@ -90,19 +95,19 @@ KafkaAdapterManager::KafkaAdapterManager( csp::Engine * engine, const Dictionary
90
95
setConfProperties ( m_consumerConf.get (), *properties.get <DictionaryPtr>( " rd_kafka_consumer_conf_properties" ) );
91
96
if ( properties.exists ( " start_offset" ) )
92
97
{
93
- // used later in start since we need starttime
98
+ // used later in start since we need starttime
94
99
m_startOffsetProperty = properties.getUntypedValue ( " start_offset" );
95
100
}
96
101
97
- if ( m_consumerConf -> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
102
+ if ( m_consumerConf-> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
98
103
CSP_THROW ( RuntimeException, " Failed to set consumer error cb: " << errstr );
99
104
100
105
m_producerConf.reset ( RdKafka::Conf::create ( RdKafka::Conf::CONF_GLOBAL ) );
101
106
setConfProperties ( m_producerConf.get (), rdKafkaProperties );
102
107
setConfProperties ( m_producerConf.get (), *properties.get <DictionaryPtr>( " rd_kafka_producer_conf_properties" ) );
103
- if ( m_producerConf -> set ( " dr_cb" , m_producerCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
108
+ if ( m_producerConf-> set ( " dr_cb" , m_producerCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
104
109
CSP_THROW ( RuntimeException, " Failed to set producer callback: " << errstr );
105
- if ( m_producerConf -> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
110
+ if ( m_producerConf-> set ( " event_cb" , m_eventCb.get (), errstr ) != RdKafka::Conf::CONF_OK )
106
111
CSP_THROW ( RuntimeException, " Failed to set producer error cb: " << errstr );
107
112
}
108
113
@@ -112,7 +117,7 @@ KafkaAdapterManager::~KafkaAdapterManager()
112
117
if ( m_producerPollThreadActive )
113
118
{
114
119
m_producerPollThreadActive = false ;
115
- m_producerPollThread -> join ();
120
+ m_producerPollThread-> join ();
116
121
}
117
122
}
118
123
@@ -122,9 +127,9 @@ void KafkaAdapterManager::setConfProperties( RdKafka::Conf * conf, const Diction
122
127
123
128
for ( auto it = properties.begin (); it != properties.end (); ++it )
124
129
{
125
- std::string key = it.key ();
130
+ std::string key = it.key ();
126
131
std::string value = properties.get <std::string>( key );
127
- if ( conf -> set ( key, value, errstr ) != RdKafka::Conf::CONF_OK )
132
+ if ( conf-> set ( key, value, errstr ) != RdKafka::Conf::CONF_OK )
128
133
CSP_THROW ( RuntimeException, " Failed to set property " << key << " : " << errstr );
129
134
}
130
135
}
@@ -134,18 +139,18 @@ void KafkaAdapterManager::forceShutdown( const std::string & err )
134
139
forceConsumerReplayComplete ();
135
140
try
136
141
{
137
- CSP_THROW ( RuntimeException, " Kafka fatal error. " + err );
142
+ CSP_THROW ( RuntimeException, " Kafka fatal error. " + err );
138
143
}
139
144
catch ( const RuntimeException & )
140
145
{
141
- rootEngine () -> shutdown ( std::current_exception () );
146
+ rootEngine ()-> shutdown ( std::current_exception () );
142
147
}
143
148
}
144
149
145
150
void KafkaAdapterManager::forceConsumerReplayComplete ()
146
151
{
147
152
for ( auto & consumer : m_consumerVector )
148
- consumer -> forceReplayCompleted ();
153
+ consumer-> forceReplayCompleted ();
149
154
}
150
155
151
156
void KafkaAdapterManager::start ( DateTime starttime, DateTime endtime )
@@ -155,29 +160,29 @@ void KafkaAdapterManager::start( DateTime starttime, DateTime endtime )
155
160
if ( !m_staticPublishers.empty () || !m_dynamicPublishers.empty () )
156
161
{
157
162
m_producer.reset ( RdKafka::Producer::create ( m_producerConf.get (), errstr ) );
158
- if ( !m_producer )
163
+ if ( !m_producer )
159
164
{
160
165
CSP_THROW ( RuntimeException, " Failed to create producer: " << errstr );
161
166
}
162
167
}
163
168
164
169
// start all consumers
165
170
for ( auto & it : m_consumerVector )
166
- it -> start ( starttime );
171
+ it-> start ( starttime );
167
172
168
173
// start all publishers
169
174
for ( auto & it : m_staticPublishers )
170
- it.second -> start ( m_producer );
175
+ it.second -> start ( m_producer );
171
176
172
177
for ( auto & it : m_dynamicPublishers )
173
- it -> start ( m_producer );
178
+ it-> start ( m_producer );
174
179
175
180
AdapterManager::start ( starttime, endtime );
176
181
177
182
if ( !m_staticPublishers.empty () || !m_dynamicPublishers.empty () )
178
183
{
179
184
m_producerPollThreadActive = true ;
180
- m_producerPollThread = std::make_unique<std::thread>( [ this ](){ pollProducers (); } );
185
+ m_producerPollThread = std::make_unique<std::thread>( [this ]() { pollProducers (); } );
181
186
}
182
187
}
183
188
@@ -187,20 +192,20 @@ void KafkaAdapterManager::stop()
187
192
188
193
// stop all consumers
189
194
for ( auto & it : m_consumerVector )
190
- it -> stop ();
195
+ it-> stop ();
191
196
192
197
if ( m_producerPollThreadActive )
193
198
{
194
199
m_producerPollThreadActive = false ;
195
- m_producerPollThread -> join ();
200
+ m_producerPollThread-> join ();
196
201
}
197
202
198
203
// stop all publishers
199
204
for ( auto & it : m_staticPublishers )
200
- it.second -> stop ();
205
+ it.second -> stop ();
201
206
202
207
for ( auto & it : m_dynamicPublishers )
203
- it -> stop ();
208
+ it-> stop ();
204
209
205
210
m_staticPublishers.clear ();
206
211
m_dynamicPublishers.clear ();
@@ -218,44 +223,46 @@ void KafkaAdapterManager::pollProducers()
218
223
{
219
224
while ( m_producerPollThreadActive )
220
225
{
221
- m_producer -> poll ( 1000 );
226
+ m_producer-> poll ( 1000 );
222
227
}
223
228
224
229
try
225
230
{
226
231
while ( true )
227
232
{
228
- auto rc = m_producer -> flush ( 10000 );
233
+ auto rc = m_producer-> flush ( 10000 );
229
234
if ( !rc )
230
235
break ;
231
236
232
237
if ( rc && rc != RdKafka::ERR__TIMED_OUT )
233
- CSP_THROW ( RuntimeException, " KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str ( rc ) );
238
+ CSP_THROW ( RuntimeException,
239
+ " KafkaProducer failed to flush pending msgs on shutdown: " << RdKafka::err2str ( rc ) );
234
240
}
235
241
}
236
242
catch ( ... )
237
243
{
238
- rootEngine () -> shutdown ( std::current_exception () );
244
+ rootEngine ()-> shutdown ( std::current_exception () );
239
245
}
240
246
}
241
247
242
- PushInputAdapter * KafkaAdapterManager::getInputAdapter ( CspTypePtr & type, PushMode pushMode, const Dictionary & properties )
248
+ PushInputAdapter * KafkaAdapterManager::getInputAdapter ( CspTypePtr & type, PushMode pushMode,
249
+ const Dictionary & properties )
243
250
{
244
- std::string topic = properties.get <std::string>( " topic" );
245
- std::string key = properties.get <std::string>( " key" );
246
- KafkaSubscriber * subscriber = this -> getSubscriber ( topic, key, properties );
247
- return subscriber -> getInputAdapter ( type, pushMode, properties );
251
+ std::string topic = properties.get <std::string>( " topic" );
252
+ std::string key = properties.get <std::string>( " key" );
253
+ KafkaSubscriber * subscriber = this -> getSubscriber ( topic, key, properties );
254
+ return subscriber-> getInputAdapter ( type, pushMode, properties );
248
255
}
249
256
250
257
OutputAdapter * KafkaAdapterManager::getOutputAdapter ( CspTypePtr & type, const Dictionary & properties )
251
258
{
252
259
std::string topic = properties.get <std::string>( " topic" );
253
260
try
254
261
{
255
- auto key = properties.get <std::string>( " key" );
256
- auto pair = TopicKeyPair ( topic, key );
257
- KafkaPublisher * publisher = this -> getStaticPublisher ( pair, properties );
258
- return publisher -> getOutputAdapter ( type, properties, key );
262
+ auto key = properties.get <std::string>( " key" );
263
+ auto pair = TopicKeyPair ( topic, key );
264
+ KafkaPublisher * publisher = this -> getStaticPublisher ( pair, properties );
265
+ return publisher-> getOutputAdapter ( type, properties, key );
259
266
}
260
267
catch ( TypeError & e )
261
268
{
@@ -264,8 +271,8 @@ OutputAdapter * KafkaAdapterManager::getOutputAdapter( CspTypePtr & type, const
264
271
for ( auto & it : key )
265
272
keyFields.emplace_back ( std::get<std::string>( it._data ) );
266
273
267
- KafkaPublisher * publisher = this -> getDynamicPublisher ( topic, properties );
268
- return publisher -> getOutputAdapter ( type, properties, keyFields );
274
+ KafkaPublisher * publisher = this -> getDynamicPublisher ( topic, properties );
275
+ return publisher-> getOutputAdapter ( type, properties, keyFields );
269
276
}
270
277
}
271
278
@@ -276,37 +283,38 @@ KafkaConsumer * KafkaAdapterManager::getConsumer( const std::string & topic, con
276
283
// If we have reached m_maxThreads, then round-robin the topic onto a consumer (and insert it into the map)
277
284
if ( m_consumerMap.find ( topic ) != m_consumerMap.end () )
278
285
{
279
- return m_consumerMap[ topic ].get ();
286
+ return m_consumerMap[topic].get ();
280
287
}
281
288
if ( m_consumerVector.size () < m_maxThreads )
282
289
{
283
290
auto consumer = std::make_shared<KafkaConsumer>( this , properties );
284
291
m_consumerVector.emplace_back ( consumer );
285
292
m_consumerMap.emplace ( topic, consumer );
286
- return m_consumerMap[ topic ].get ();
293
+ return m_consumerMap[topic].get ();
287
294
}
288
295
289
- auto consumer = m_consumerVector[ m_consumerIdx++ ];
296
+ auto consumer = m_consumerVector[m_consumerIdx++];
290
297
m_consumerMap.emplace ( topic, consumer );
291
298
if ( m_consumerIdx >= m_maxThreads )
292
299
m_consumerIdx = 0 ;
293
300
return consumer.get ();
294
301
}
295
302
296
- KafkaSubscriber * KafkaAdapterManager::getSubscriber ( const std::string & topic, const std::string & key, const Dictionary & properties )
303
+ KafkaSubscriber * KafkaAdapterManager::getSubscriber ( const std::string & topic, const std::string & key,
304
+ const Dictionary & properties )
297
305
{
298
306
auto pair = TopicKeyPair ( topic, key );
299
- auto rv = m_subscribers.emplace ( pair, nullptr );
307
+ auto rv = m_subscribers.emplace ( pair, nullptr );
300
308
301
309
if ( rv.second )
302
310
{
303
311
std::unique_ptr<KafkaSubscriber> subscriber ( new KafkaSubscriber ( this , properties ) );
304
- rv.first -> second = std::move ( subscriber );
312
+ rv.first -> second = std::move ( subscriber );
305
313
306
- this -> getConsumer ( topic, properties ) -> addSubscriber ( topic, key, rv.first -> second.get () );
314
+ this -> getConsumer ( topic, properties )-> addSubscriber ( topic, key, rv.first -> second .get () );
307
315
}
308
316
309
- return rv.first -> second.get ();
317
+ return rv.first -> second .get ();
310
318
}
311
319
312
320
// for static (string) keys, we create one publisher instance per <topic, key> pair
@@ -317,10 +325,10 @@ KafkaPublisher * KafkaAdapterManager::getStaticPublisher( const TopicKeyPair & p
317
325
if ( rv.second )
318
326
{
319
327
std::unique_ptr<KafkaPublisher> publisher ( new KafkaPublisher ( this , properties, pair.first ) );
320
- rv.first -> second = std::move ( publisher );
328
+ rv.first -> second = std::move ( publisher );
321
329
}
322
330
323
- KafkaPublisher * p = rv.first -> second.get ();
331
+ KafkaPublisher * p = rv.first -> second .get ();
324
332
return p;
325
333
}
326
334
@@ -332,4 +340,4 @@ KafkaPublisher * KafkaAdapterManager::getDynamicPublisher( const std::string & t
332
340
return p;
333
341
}
334
342
335
- }
343
+ } // namespace csp::adapters::kafka
0 commit comments