@@ -17,25 +17,24 @@ def GetPartitionIds(self):
1717 return partition_ids
1818
1919 def SendAndReceiveEvents (self , partitionID ):
20- with self .client .create_consumer (consumer_group = "$default" , partition_id = partitionID , event_position = EventPosition (datetime .utcnow ())) as consumer :
20+ consumer = self .client .create_consumer (consumer_group = "$default" , partition_id = partitionID , event_position = EventPosition (datetime .utcnow ()))
2121
22- print ("Sending events..." )
23- with self .client .create_producer (partition_id = partitionID ) as producer :
24- event_list = [
25- EventData (b"Test Event 1 in Python" ),
26- EventData (b"Test Event 2 in Python" ),EventData (b"Test Event 3 in Python" )]
27- producer .send (event_list )
28- print ("\t done" )
29-
30- print ("Receiving events..." )
31- received = consumer .receive (max_batch_size = len (event_list ), timeout = 2 )
32- for event_data in received :
33- print ("\t Event Received: " + event_data .body_as_str ())
34-
35- print ("\t done" )
22+ print ("Sending events..." )
23+ producer = self .client .create_producer (partition_id = partitionID )
24+ event_list = [EventData (b"Test Event 1 in Python" ),EventData (b"Test Event 2 in Python" ),EventData (b"Test Event 3 in Python" )]
25+ producer .send (event_list )
26+ producer .close ()
27+ print ("\t done" )
28+
29+ print ("Receiving events..." )
30+ received = consumer .receive (max_batch_size = len (event_list ), timeout = 2 )
31+ for event_data in received :
32+ print ("\t Event Received: " + event_data .body_as_str ())
33+ consumer .close ()
34+ print ("\t done" )
3635
37- if (len (received ) != len (event_list )):
38- raise Exception ("Error, expecting {0} events, but {1} were received." .format (str (len (event_list )),str (len (received ))))
36+ if (len (received ) != len (event_list )):
37+ raise Exception ("Error, expecting {0} events, but {1} were received." .format (str (len (event_list )),str (len (received ))))
3938
4039 def Run (self ):
4140 print ()
0 commit comments