@@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None:
260260You can create a streaming subscription to a PubSub topic using either the ` subscribe `
261261or ` subscribe_handler ` methods.
262262
263- The ` subscribe ` method returns a ` Subscription ` object, which allows you to pull messages from the
264- stream by
263+ The ` subscribe ` method returns an iterable ` Subscription ` object, which allows you to pull messages from the
264+ stream by using a ` for ` loop (ex. ` for message in subscription ` ) or by
265265calling the ` next_message ` method. This will block on the main thread while waiting for messages.
266266When done, you should call the close method to terminate the
267267subscription and stop receiving messages.
@@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method:
281281import time
282282
283283from dapr.clients import DaprClient
284- from dapr.clients.grpc.subscription import StreamInactiveError
284+ from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError
285285
286286counter = 0
287287
@@ -303,30 +303,35 @@ def main():
303303 )
304304
305305 try :
306- while counter < 5 :
307- try :
308- message = subscription.next_message()
306+ for message in subscription:
307+ if message is None :
308+ print (' No message received. The stream might have been cancelled.' )
309+ continue
309310
310- except StreamInactiveError as e:
311+ try :
312+ response_status = process_message(message)
313+
314+ if response_status == ' success' :
315+ subscription.respond_success(message)
316+ elif response_status == ' retry' :
317+ subscription.respond_retry(message)
318+ elif response_status == ' drop' :
319+ subscription.respond_drop(message)
320+
321+ if counter >= 5 :
322+ break
323+ except StreamInactiveError:
311324 print (' Stream is inactive. Retrying...' )
312325 time.sleep(1 )
313326 continue
314- if message is None :
315- print (' No message received within timeout period.' )
316- continue
317-
318- # Process the message
319- response_status = process_message(message)
320-
321- if response_status == ' success' :
322- subscription.respond_success(message)
323- elif response_status == ' retry' :
324- subscription.respond_retry(message)
325- elif response_status == ' drop' :
326- subscription.respond_drop(message)
327+ except StreamCancelledError:
328+ print (' Stream was cancelled' )
329+ break
330+ except Exception as e:
331+ print (f ' Error occurred during message processing: { e} ' )
327332
328333 finally :
329- print (" Closing subscription..." )
334+ print (' Closing subscription...' )
330335 subscription.close()
331336
332337
0 commit comments