diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index b77101c2..3517c01a 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -63,10 +63,6 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: return decorator -class InvalidStateError(RuntimeError): - pass - - ConsumerTiming = Literal[ "arroyo.consumer.poll.time", "arroyo.consumer.processing.time", @@ -210,23 +206,6 @@ def _close_strategy() -> None: self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value) - def _create_strategy(partitions: Mapping[Partition, int]) -> None: - start_create = time.time() - - self.__processing_strategy = ( - self.__processor_factory.create_with_partitions( - self.__commit, partitions - ) - ) - - self.__metrics_buffer.metrics.timing( - "arroyo.consumer.run.create_strategy", time.time() - start_create - ) - - logger.debug( - "Initialized processing strategy: %r", self.__processing_strategy - ) - @_rdkafka_callback(metrics=self.__metrics_buffer) def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) @@ -243,7 +222,6 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: "Partition assignment while processing strategy active" ) _close_strategy() - _create_strategy(partitions) @_rdkafka_callback(metrics=self.__metrics_buffer) def on_partitions_revoked(partitions: Sequence[Partition]) -> None: @@ -256,24 +234,6 @@ def on_partitions_revoked(partitions: Sequence[Partition]) -> None: if partitions: _close_strategy() - # Recreate the strategy if the consumer still has other partitions - # assigned and is not closed or errored - try: - current_partitions = self.__consumer.tell() - if len(current_partitions.keys() - set(partitions)): - active_partitions = { - partition: offset - for partition, offset in current_partitions.items() - if partition not in partitions - } - logger.info( - "Recreating strategy since there are still active partitions: %r", - active_partitions, - ) - _create_strategy(active_partitions) - except RuntimeError: - pass - # Partition revocation can happen anytime during the consumer lifecycle and happen # multiple times. What we want to know is that the consumer is not stuck somewhere. # The presence of this message as the last message of a consumer @@ -310,6 +270,19 @@ def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> Non ) self.__commit_policy_state.did_commit(now, offsets) + def __create_strategy(self, partitions: Mapping[Partition, int]) -> None: + start_create = time.time() + + self.__processing_strategy = self.__processor_factory.create_with_partitions( + self.__commit, partitions + ) + + self.__metrics_buffer.metrics.timing( + "arroyo.consumer.run.create_strategy", time.time() - start_create + ) + + logger.debug("Initialized processing strategy: %r", self.__processing_strategy) + def run(self) -> None: "The main run loop, see class docstring for more information." @@ -387,6 +360,10 @@ def _run_once(self) -> None: except RecoverableError: return + if self.__processing_strategy is None and self.__message is not None: + current_offsets = self.__consumer.tell() + self.__create_strategy(current_offsets) + if self.__processing_strategy is not None: start_poll = time.time() try: @@ -398,58 +375,59 @@ def _run_once(self) -> None: self.__metrics_buffer.incr_timing( "arroyo.consumer.processing.time", time.time() - start_poll ) - if self.__message is not None: - try: - start_submit = time.time() - message = ( - Message(self.__message) if self.__message is not None else None - ) - self.__processing_strategy.submit(message) - self.__metrics_buffer.incr_timing( - "arroyo.consumer.processing.time", - time.time() - start_submit, - ) - except MessageRejected as e: - # If the processing strategy rejected our message, we need - # to pause the consumer and hold the message until it is - # accepted, at which point we can resume consuming. - # if not message_carried_over: - if self.__backpressure_timestamp is None: - self.__backpressure_timestamp = time.time() - - elif not self.__is_paused and ( - time.time() - self.__backpressure_timestamp > 1 - ): - logger.debug( - "Caught %r while submitting %r, pausing consumer...", - e, - self.__message, - ) - self.__consumer.pause([*self.__consumer.tell().keys()]) - self.__is_paused = True - - else: - time.sleep(0.01) + if self.__message is not None: + # in a previous if-stmt, we have ensured that + # self.__processing_strategy is available if self.__message is not + # None + assert self.__processing_strategy is not None - except InvalidMessage as e: - self._handle_invalid_message(e) + try: + start_submit = time.time() + message = ( + Message(self.__message) if self.__message is not None else None + ) + self.__processing_strategy.submit(message) - else: - # Resume if we are currently in a paused state - if self.__is_paused: - self.__consumer.resume([*self.__consumer.tell().keys()]) - self.__is_paused = False - - # Clear backpressure timestamp if it is set - self._clear_backpressure() - - self.__message = None - else: - if self.__message is not None: - raise InvalidStateError( - "received message without active processing strategy" + self.__metrics_buffer.incr_timing( + "arroyo.consumer.processing.time", + time.time() - start_submit, ) + except MessageRejected as e: + # If the processing strategy rejected our message, we need + # to pause the consumer and hold the message until it is + # accepted, at which point we can resume consuming. + # if not message_carried_over: + if self.__backpressure_timestamp is None: + self.__backpressure_timestamp = time.time() + + elif not self.__is_paused and ( + time.time() - self.__backpressure_timestamp > 1 + ): + logger.debug( + "Caught %r while submitting %r, pausing consumer...", + e, + self.__message, + ) + self.__consumer.pause([*self.__consumer.tell().keys()]) + self.__is_paused = True + + else: + time.sleep(0.01) + + except InvalidMessage as e: + self._handle_invalid_message(e) + + else: + # Resume if we are currently in a paused state + if self.__is_paused: + self.__consumer.resume([*self.__consumer.tell().keys()]) + self.__is_paused = False + + # Clear backpressure timestamp if it is set + self._clear_backpressure() + + self.__message = None def signal_shutdown(self) -> None: """ diff --git a/tests/processing/test_processor.py b/tests/processing/test_processor.py index 31cf8719..e133e766 100644 --- a/tests/processing/test_processor.py +++ b/tests/processing/test_processor.py @@ -11,7 +11,7 @@ from arroyo.backends.local.storages.memory import MemoryMessageStorage from arroyo.commit import IMMEDIATE, CommitPolicy from arroyo.dlq import DlqPolicy, InvalidMessage -from arroyo.processing.processor import InvalidStateError, StreamProcessor +from arroyo.processing.processor import StreamProcessor from arroyo.processing.strategies import Healthcheck from arroyo.processing.strategies.abstract import ( MessageRejected, @@ -61,18 +61,18 @@ def test_stream_processor_lifecycle() -> None: offsets = {Partition(topic, 0): 0} assignment_callback(offsets) - # If ``Consumer.poll`` doesn't return a message, we should poll the - # processing strategy, but not submit anything for processing. + # If ``Consumer.poll`` doesn't return a message, we should not have created + # or polled a strategy yet consumer.poll.return_value = None with assert_changes( - lambda: int(strategy.poll.call_count), 0, 1 + lambda: int(strategy.poll.call_count), 0, 0 ), assert_does_not_change(lambda: int(strategy.submit.call_count), 0): processor._run_once() # If ``Consumer.poll`` **does** return a message, we should poll the # processing strategy and submit the message for processing. consumer.poll.return_value = message.value - with assert_changes(lambda: int(strategy.poll.call_count), 1, 2), assert_changes( + with assert_changes(lambda: int(strategy.poll.call_count), 0, 1), assert_changes( lambda: int(strategy.submit.call_count), 0, 1 ): processor._run_once() @@ -98,8 +98,7 @@ def test_stream_processor_lifecycle() -> None: processor._run_once() assert strategy.submit.call_args_list[-1] == mock.call(message) - # Strategy should be closed and recreated if it already exists and - # we got another partition assigned. + # Strategy should be closed, but not created again. with assert_changes(lambda: int(strategy.close.call_count), 0, 1): assignment_callback({Partition(topic, 0): 0}) @@ -107,19 +106,15 @@ def test_stream_processor_lifecycle() -> None: # strategy instance to be closed. consumer.tell.return_value = {} - with assert_changes(lambda: int(strategy.close.call_count), 1, 2): + # there should not be an active strategy between assigning and revoking, so + # closing should not happen. + with assert_changes(lambda: int(strategy.close.call_count), 1, 1): revocation_callback([Partition(topic, 0)]) # Revocation should noop without an active assignment. revocation_callback([Partition(topic, 0)]) revocation_callback([Partition(topic, 0)]) - # The processor should not accept non-heartbeat messages without an - # assignment or active processor. - consumer.poll.return_value = message.value - with pytest.raises(InvalidStateError): - processor._run_once() - with assert_changes(lambda: int(consumer.close.call_count), 0, 1), assert_changes( lambda: int(factory.shutdown.call_count), 0, 1 ): @@ -127,18 +122,16 @@ def test_stream_processor_lifecycle() -> None: assert list((type(call), call.name) for call in metrics.calls) == [ (Increment, "arroyo.consumer.partitions_assigned.count"), - (Timing, "arroyo.consumer.run.create_strategy"), (Timing, "arroyo.consumer.run.callback"), + (Timing, "arroyo.consumer.run.create_strategy"), (Timing, "arroyo.consumer.poll.time"), (Timing, "arroyo.consumer.callback.time"), (Timing, "arroyo.consumer.processing.time"), (Increment, "arroyo.consumer.run.count"), (Increment, "arroyo.consumer.partitions_assigned.count"), (Timing, "arroyo.consumer.run.close_strategy"), - (Timing, "arroyo.consumer.run.create_strategy"), (Timing, "arroyo.consumer.run.callback"), (Increment, "arroyo.consumer.partitions_revoked.count"), - (Timing, "arroyo.consumer.run.close_strategy"), (Timing, "arroyo.consumer.run.callback"), (Increment, "arroyo.consumer.partitions_revoked.count"), (Timing, "arroyo.consumer.run.callback"), @@ -149,7 +142,6 @@ def test_stream_processor_lifecycle() -> None: (Timing, "arroyo.consumer.join.time"), (Timing, "arroyo.consumer.shutdown.time"), (Timing, "arroyo.consumer.callback.time"), - (Timing, "arroyo.consumer.poll.time"), (Increment, "arroyo.consumer.run.count"), ] @@ -278,10 +270,18 @@ def test_stream_processor_invalid_message_from_submit() -> None: def test_stream_processor_create_with_partitions() -> None: topic = Topic("topic") + partition = Partition(topic, 0) + offset = 0 + now = datetime.now() + payload = 0 + + message = Message(BrokerValue(payload, partition, offset, now)) + consumer = mock.Mock() strategy = mock.Mock() factory = mock.Mock() factory.create_with_partitions.return_value = strategy + consumer.poll.return_value = message with assert_changes(lambda: int(consumer.subscribe.call_count), 0, 1): processor: StreamProcessor[int] = StreamProcessor( @@ -296,7 +296,9 @@ def test_stream_processor_create_with_partitions() -> None: # First partition assigned offsets_p0 = {Partition(topic, 0): 0} + consumer.tell.return_value = offsets_p0 assignment_callback(offsets_p0) + processor._run_once() create_args, _ = factory.create_with_partitions.call_args assert factory.create_with_partitions.call_count == 1 @@ -304,17 +306,20 @@ def test_stream_processor_create_with_partitions() -> None: # Second partition assigned offsets_p1 = {Partition(topic, 1): 0} + consumer.tell.return_value = {**offsets_p0, **offsets_p1} assignment_callback(offsets_p1) + processor._run_once() create_args, _ = factory.create_with_partitions.call_args assert factory.create_with_partitions.call_count == 2 - assert create_args[1] == offsets_p1 - + assert create_args[1] == {**offsets_p0, **offsets_p1} processor._run_once() # First partition revoked consumer.tell.return_value = {**offsets_p0, **offsets_p1} revocation_callback([Partition(topic, 0)]) + consumer.tell.return_value = offsets_p1 + processor._run_once() create_args, _ = factory.create_with_partitions.call_args assert factory.create_with_partitions.call_count == 3