@@ -152,25 +152,23 @@ static const test_data_t ldata[] = {
152
152
153
153
154
154
void run_cap (void *zctx, bool &term, string &read_source,
155
- int &cnt)
155
+ int &cnt, bool &should_read_control )
156
156
{
157
157
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
158
158
string source;
159
159
internal_event_t ev_int;
160
160
int block_ms = 200 ;
161
161
int i=0 ;
162
- static int proxy_finished_init = false ;
163
162
164
163
EXPECT_TRUE (NULL != mock_cap);
165
164
EXPECT_EQ (0 , zmq_connect (mock_cap, get_config (CAPTURE_END_KEY).c_str ()));
166
165
EXPECT_EQ (0 , zmq_setsockopt (mock_cap, ZMQ_SUBSCRIBE, " " , 0 ));
167
166
EXPECT_EQ (0 , zmq_setsockopt (mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
168
167
169
- if (!proxy_finished_init ) {
168
+ if (should_read_control ) {
170
169
zmq_msg_t msg;
171
170
zmq_msg_init (&msg);
172
- EXPECT_EQ (1 , zmq_msg_recv (&msg, mock_cap, 0 )); // Subscription message
173
- proxy_finished_init = true ;
171
+ EXPECT_NE (1 , zmq_msg_recv (&msg, mock_cap, 0 )); // Subscription message should be read by do_capture
174
172
}
175
173
176
174
while (!term) {
@@ -227,10 +225,10 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
227
225
}
228
226
}
229
227
230
-
231
228
TEST (eventd, proxy)
232
229
{
233
230
printf (" Proxy TEST started\n " );
231
+ bool should_read_control = false ;
234
232
bool term_sub = false ;
235
233
bool term_cap = false ;
236
234
string rd_csource, rd_source, wr_source (" hello" );
@@ -247,12 +245,12 @@ TEST(eventd, proxy)
247
245
/* Starting proxy */
248
246
EXPECT_EQ (0 , pxy->init ());
249
247
248
+ /* capture in a thread */
249
+ thread thrc (&run_cap, zctx, ref (term_cap), ref (rd_csource), ref (rd_cevts_sz), ref (should_read_control));
250
+
250
251
/* subscriber in a thread */
251
252
thread thr (&run_sub, zctx, ref (term_sub), ref (rd_source), ref (rd_evts), ref (rd_evts_sz));
252
253
253
- /* capture in a thread */
254
- thread thrc (&run_cap, zctx, ref (term_cap), ref (rd_csource), ref (rd_cevts_sz));
255
-
256
254
/* Init pub connection */
257
255
void *mock_pub = init_pub (zctx);
258
256
@@ -275,9 +273,6 @@ TEST(eventd, proxy)
275
273
}
276
274
this_thread::sleep_for (chrono::milliseconds (1000 ));
277
275
278
- delete pxy;
279
- pxy = NULL ;
280
-
281
276
term_sub = true ;
282
277
term_cap = true ;
283
278
@@ -287,6 +282,18 @@ TEST(eventd, proxy)
287
282
EXPECT_EQ (rd_cevts_sz, wr_evts.size ());
288
283
289
284
zmq_close (mock_pub);
285
+
286
+ /* Do control test */
287
+
288
+ should_read_control = true ;
289
+
290
+ /* capture in a thread */
291
+ thread thrcc (&run_cap, zctx, ref (term_cap), ref (rd_csource), ref (rd_cevts_sz), ref (should_read_control));
292
+
293
+ delete pxy;
294
+ pxy = NULL ;
295
+
296
+ thrcc.join ();
290
297
zmq_ctx_term (zctx);
291
298
292
299
/* Provide time for async proxy removal to complete */
@@ -295,7 +302,6 @@ TEST(eventd, proxy)
295
302
printf (" eventd_proxy is tested GOOD\n " );
296
303
}
297
304
298
-
299
305
TEST (eventd, capture)
300
306
{
301
307
printf (" Capture TEST started\n " );
@@ -329,9 +335,6 @@ TEST(eventd, capture)
329
335
/* Starting proxy */
330
336
EXPECT_EQ (0 , pxy->init ());
331
337
332
- /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
333
- thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
334
-
335
338
/* Create capture service */
336
339
capture_service *pcap = new capture_service (zctx, cache_max, &stats_instance);
337
340
@@ -341,6 +344,9 @@ TEST(eventd, capture)
341
344
/* Initialize the capture */
342
345
EXPECT_EQ (0 , pcap->set_control (INIT_CAPTURE));
343
346
347
+ /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
348
+ thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
349
+
344
350
EXPECT_TRUE (init_cache > 1 );
345
351
EXPECT_TRUE ((cache_max+3 ) < (int )ARRAY_SIZE (ldata));
346
352
@@ -473,9 +479,6 @@ TEST(eventd, captureCacheMax)
473
479
/* Starting proxy */
474
480
EXPECT_EQ (0 , pxy->init ());
475
481
476
- /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
477
- thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
478
-
479
482
/* Create capture service */
480
483
capture_service *pcap = new capture_service (zctx, cache_max, &stats_instance);
481
484
@@ -484,6 +487,9 @@ TEST(eventd, captureCacheMax)
484
487
485
488
EXPECT_TRUE (init_cache > 1 );
486
489
490
+ /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */
491
+ thread thr_sub (&run_sub, zctx, ref (term_sub), ref (sub_source), ref (sub_evts), ref (sub_evts_sz));
492
+
487
493
/* Collect few serailized strings of events for startup cache */
488
494
for (int i=0 ; i < init_cache; ++i) {
489
495
internal_event_t ev (create_ev (ldata[i]));
@@ -595,6 +601,7 @@ TEST(eventd, service)
595
601
}
596
602
597
603
thread thread_service (&run_eventd_service);
604
+ this_thread::sleep_for (chrono::milliseconds (CAPTURE_SERVICE_POLLING_DURATION * CAPTURE_SERVICE_POLLING_RETRIES));
598
605
599
606
/* Need client side service to interact with server side */
600
607
EXPECT_EQ (0 , service.init_client (zctx));
@@ -610,7 +617,7 @@ TEST(eventd, service)
610
617
string wr_source (" hello" );
611
618
612
619
/* Test service startup caching */
613
- event_serialized_lst_t evts_start, evts_read;
620
+ event_serialized_lst_t evts_start, evts_read, polled_events ;
614
621
615
622
for (int i=0 ; i<wr_sz; ++i) {
616
623
string evt_str;
@@ -624,15 +631,32 @@ TEST(eventd, service)
624
631
/* Publish events. */
625
632
run_pub (mock_pub, wr_source, wr_evts);
626
633
627
- /* Published events must have been captured. Give a pause, to ensure sent. */
628
- this_thread::sleep_for (chrono::milliseconds (200 ));
634
+ int max_polling_duration = 2000 ;
635
+ int polling_interval = 100 ;
636
+ auto poll_start_ts = chrono::steady_clock::now ();
637
+
638
+ while (true ) {
639
+ auto current_ts = chrono::steady_clock::now ();
640
+ if (chrono::duration_cast<chrono::milliseconds>(current_ts - poll_start_ts).count () >= max_polling_duration) {
641
+ break ;
642
+ }
643
+ event_serialized_lst_t read_events;
644
+ service.cache_read (read_events);
645
+ polled_events.insert (polled_events.end (), read_events.begin (), read_events.end ());
646
+ if (!read_events.empty ()) {
647
+ break ;
648
+ }
649
+ this_thread::sleep_for (chrono::milliseconds (polling_interval));
650
+ }
629
651
630
652
EXPECT_EQ (0 , service.cache_stop ());
631
653
632
- /* Read the cache; expect wr_sz events */
654
+ /* Read remaining events in cache, if any */
633
655
EXPECT_EQ (0 , service.cache_read (evts_read));
634
656
635
- EXPECT_EQ (evts_read, evts_start);
657
+ polled_events.insert (polled_events.end (), evts_read.begin (), evts_read.end ());
658
+
659
+ EXPECT_EQ (polled_events, evts_start);
636
660
637
661
zmq_close (mock_pub);
638
662
}
0 commit comments