@@ -79,11 +79,10 @@ def graph(count: int):
7979 }
8080
8181 topic = f"test.metadata.{ os .getpid ()} "
82- _precreate_topic (topic )
8382 subKey = "foo"
8483 pubKey = ["mapped_a" , "mapped_b" , "mapped_c" ]
8584
86- c = csp .count (csp .timer (timedelta (seconds = 0. 1 )))
85+ c = csp .count (csp .timer (timedelta (seconds = 1 )))
8786 t = csp .sample (c , csp .const ("foo" ))
8887
8988 pubStruct = MetaPubData .collectts (
@@ -104,22 +103,27 @@ def graph(count: int):
104103 )
105104
106105 csp .add_graph_output ("sub_data" , sub_data )
107- # csp.print(' sub' , sub_data)
106+ csp .print (" sub" , sub_data )
108107 # Wait for at least count ticks and until we get a live tick
109- done_flag = csp .count (sub_data ) >= count
110- done_flag = csp .and_ (done_flag , sub_data .mapped_live is True )
108+ done_flag = csp .and_ (csp .count (sub_data ) >= count , sub_data .mapped_live == True ) # noqa: E712
111109 stop = csp .filter (done_flag , done_flag )
112110 csp .stop_engine (stop )
113111
114- count = 5
115- results = csp .run (graph , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )
112+ # warm up the topic
113+ results = csp .run (graph , 1 , starttime = datetime .utcnow (), endtime = timedelta (seconds = 3 ), realtime = True )
114+
115+ # now send some live in
116+ results = csp .run (graph , 5 , starttime = datetime .utcnow (), endtime = timedelta (seconds = 20 ), realtime = True )
116117 assert len (results ["sub_data" ]) >= 5
117118 print (results )
118119 for result in results ["sub_data" ]:
119120 assert result [1 ].mapped_partition >= 0
120121 assert result [1 ].mapped_offset >= 0
121122 assert result [1 ].mapped_live is not None
122123 assert result [1 ].mapped_timestamp < datetime .utcnow ()
124+ # first record should be non live
125+ assert results ["sub_data" ][0 ][1 ].mapped_live is False
126+ # last record should be live
123127 assert results ["sub_data" ][- 1 ][1 ].mapped_live
124128
125129 @pytest .mark .skipif (not os .environ .get ("CSP_TEST_KAFKA" ), reason = "Skipping kafka adapter tests" )
@@ -145,8 +149,7 @@ def graph(symbols: list, count: int):
145149 struct_field_map = {"b" : "b2" , "i" : "i2" , "d" : "d2" , "s" : "s2" , "dt" : "dt2" }
146150
147151 done_flags = []
148- topic = f"mktdata.{ os .getpid ()} "
149- _precreate_topic (topic )
152+
150153 for symbol in symbols :
151154 kafkaadapter .publish (msg_mapper , topic , symbol , b , field_map = "b" )
152155 kafkaadapter .publish (msg_mapper , topic , symbol , i , field_map = "i" )
@@ -183,10 +186,12 @@ def graph(symbols: list, count: int):
183186 stop = csp .filter (stop , stop )
184187 csp .stop_engine (stop )
185188
189+ topic = f"mktdata.{ os .getpid ()} "
190+ _precreate_topic (topic )
186191 symbols = ["AAPL" , "MSFT" ]
187192 count = 100
188193 results = csp .run (
189- graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
194+ graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 10 ), realtime = True
190195 )
191196 for symbol in symbols :
192197 pub = results [f"pall_{ symbol } " ]
@@ -212,7 +217,7 @@ def pub_graph():
212217 csp .stop_engine (stop )
213218 # csp.print('pub', struct)
214219
215- csp .run (pub_graph , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )
220+ csp .run (pub_graph , starttime = datetime .utcnow (), endtime = timedelta (seconds = 10 ), realtime = True )
216221
217222 # grab start/end times
218223 def get_times_graph ():
@@ -232,7 +237,7 @@ def get_times_graph():
232237 # csp.print('sub', data)
233238 # csp.print('status', kafkaadapter.status())
234239
235- all_data = csp .run (get_times_graph , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True )[
240+ all_data = csp .run (get_times_graph , starttime = datetime .utcnow (), endtime = timedelta (seconds = 10 ), realtime = True )[
236241 "data"
237242 ]
238243 min_time = all_data [0 ][1 ].dt
@@ -258,7 +263,7 @@ def get_data(start_offset, expected_count):
258263 KafkaStartOffset .EARLIEST ,
259264 10 ,
260265 starttime = datetime .utcnow (),
261- endtime = timedelta (seconds = 30 ),
266+ endtime = timedelta (seconds = 10 ),
262267 realtime = True ,
263268 )["data" ]
264269 # print(res)
@@ -276,7 +281,7 @@ def get_data(start_offset, expected_count):
276281 assert len (res ) == 0
277282
278283 res = csp .run (
279- get_data , KafkaStartOffset .START_TIME , 10 , starttime = min_time , endtime = timedelta (seconds = 30 ), realtime = True
284+ get_data , KafkaStartOffset .START_TIME , 10 , starttime = min_time , endtime = timedelta (seconds = 10 ), realtime = True
280285 )["data" ]
281286 assert len (res ) == 10
282287
@@ -287,12 +292,12 @@ def get_data(start_offset, expected_count):
287292 stime = all_data [2 ][1 ].dt + timedelta (milliseconds = 1 )
288293 expected = [x for x in all_data if x [1 ].dt >= stime ]
289294 res = csp .run (
290- get_data , stime , len (expected ), starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
295+ get_data , stime , len (expected ), starttime = datetime .utcnow (), endtime = timedelta (seconds = 10 ), realtime = True
291296 )["data" ]
292297 assert len (res ) == len (expected )
293298
294299 res = csp .run (
295- get_data , timedelta (seconds = 0 ), len (expected ), starttime = stime , endtime = timedelta (seconds = 30 ), realtime = True
300+ get_data , timedelta (seconds = 0 ), len (expected ), starttime = stime , endtime = timedelta (seconds = 10 ), realtime = True
296301 )["data" ]
297302 assert len (res ) == len (expected )
298303
@@ -314,8 +319,6 @@ def graph(symbols: list, count: int):
314319 msg_mapper = RawBytesMessageMapper ()
315320
316321 done_flags = []
317- topic = f"test_str.{ os .getpid ()} "
318- _precreate_topic (topic )
319322 for symbol in symbols :
320323 topic = f"test_str.{ os .getpid ()} "
321324 kafkaadapter .publish (msg_mapper , topic , symbol , d )
@@ -356,10 +359,13 @@ def graph(symbols: list, count: int):
356359 stop = csp .filter (stop , stop )
357360 csp .stop_engine (stop )
358361
362+ topic = f"test_str.{ os .getpid ()} "
363+ _precreate_topic (topic )
364+
359365 symbols = ["AAPL" , "MSFT" ]
360366 count = 10
361367 results = csp .run (
362- graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 30 ), realtime = True
368+ graph , symbols , count , starttime = datetime .utcnow (), endtime = timedelta (seconds = 10 ), realtime = True
363369 )
364370 # print(results)
365371 for symbol in symbols :
0 commit comments