Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Create strategy lazily on first message #317

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 66 additions & 88 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
return decorator


class InvalidStateError(RuntimeError):
pass


ConsumerTiming = Literal[
"arroyo.consumer.poll.time",
"arroyo.consumer.processing.time",
Expand Down Expand Up @@ -210,23 +206,6 @@ def _close_strategy() -> None:

self.__metrics_buffer.incr_timing("arroyo.consumer.shutdown.time", value)

def _create_strategy(partitions: Mapping[Partition, int]) -> None:
start_create = time.time()

self.__processing_strategy = (
self.__processor_factory.create_with_partitions(
self.__commit, partitions
)
)

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.create_strategy", time.time() - start_create
)

logger.debug(
"Initialized processing strategy: %r", self.__processing_strategy
)

@_rdkafka_callback(metrics=self.__metrics_buffer)
def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
Expand All @@ -243,7 +222,6 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
"Partition assignment while processing strategy active"
)
_close_strategy()
_create_strategy(partitions)
Copy link
Member

@lynnagara lynnagara Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert that self.__processing_strategy is None here, or make sure we close if it is not None? What would happen if we get an assign without a prior revocation? I know that doesn't actually happen today but the code right now is designed to work for incremental assignment as well, which might not be the case after this change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure i follow, we do close it on the line before exactly when the strategy is not None

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this part of rebalancing was actually broken with incremental assign, as we would create the strategy with the subset of partitions from this callback instead of the full assignment. with this pr we're now actually only calling _create_strategy with consumer.tell() which should always contain the full assignment


@_rdkafka_callback(metrics=self.__metrics_buffer)
def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
Expand All @@ -256,24 +234,6 @@ def on_partitions_revoked(partitions: Sequence[Partition]) -> None:
if partitions:
_close_strategy()

# Recreate the strategy if the consumer still has other partitions
# assigned and is not closed or errored
try:
current_partitions = self.__consumer.tell()
if len(current_partitions.keys() - set(partitions)):
active_partitions = {
partition: offset
for partition, offset in current_partitions.items()
if partition not in partitions
}
logger.info(
"Recreating strategy since there are still active partitions: %r",
active_partitions,
)
_create_strategy(active_partitions)
except RuntimeError:
pass

Comment on lines -259 to -276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that we don't get a subset of partitions revoked today as we are using eager rebalancing. But that may change in the future. This code was written this way so it also works with incremental rebalancing, even though we never actually recreate the strategy on partitions revoked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i think we're now still handling that case correctly. this code effectively moved from the callback into _run_once. both times we recreate the strategy with the return value from consumer.tell(). we're still closing and flushing the strategy if any partition had been revoked.

just the condition under which we create the strategy changed. before, we'd recreate the strategy if we still had partitions assigned, now we're creating the strategy if we have none and consumer.poll() returned a message.

# Partition revocation can happen anytime during the consumer lifecycle and happen
# multiple times. What we want to know is that the consumer is not stuck somewhere.
# The presence of this message as the last message of a consumer
Expand Down Expand Up @@ -310,6 +270,19 @@ def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> Non
)
self.__commit_policy_state.did_commit(now, offsets)

def __create_strategy(self, partitions: Mapping[Partition, int]) -> None:
start_create = time.time()

self.__processing_strategy = self.__processor_factory.create_with_partitions(
self.__commit, partitions
)

self.__metrics_buffer.metrics.timing(
"arroyo.consumer.run.create_strategy", time.time() - start_create
)

logger.debug("Initialized processing strategy: %r", self.__processing_strategy)

def run(self) -> None:
"The main run loop, see class docstring for more information."

Expand Down Expand Up @@ -387,6 +360,10 @@ def _run_once(self) -> None:
except RecoverableError:
return

if self.__processing_strategy is None and self.__message is not None:
current_offsets = self.__consumer.tell()
self.__create_strategy(current_offsets)

if self.__processing_strategy is not None:
start_poll = time.time()
try:
Expand All @@ -398,58 +375,59 @@ def _run_once(self) -> None:
self.__metrics_buffer.incr_timing(
"arroyo.consumer.processing.time", time.time() - start_poll
)
if self.__message is not None:
try:
start_submit = time.time()
message = (
Message(self.__message) if self.__message is not None else None
)
self.__processing_strategy.submit(message)

self.__metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
time.time() - start_submit,
)
except MessageRejected as e:
# If the processing strategy rejected our message, we need
# to pause the consumer and hold the message until it is
# accepted, at which point we can resume consuming.
# if not message_carried_over:
if self.__backpressure_timestamp is None:
self.__backpressure_timestamp = time.time()

elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
self.__message,
)
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True

else:
time.sleep(0.01)
if self.__message is not None:
# in a previous if-stmt, we have ensured that
# self.__processing_strategy is available if self.__message is not
# None
assert self.__processing_strategy is not None

except InvalidMessage as e:
self._handle_invalid_message(e)
try:
start_submit = time.time()
message = (
Message(self.__message) if self.__message is not None else None
)
self.__processing_strategy.submit(message)

else:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False

# Clear backpressure timestamp if it is set
self._clear_backpressure()

self.__message = None
else:
if self.__message is not None:
raise InvalidStateError(
"received message without active processing strategy"
self.__metrics_buffer.incr_timing(
"arroyo.consumer.processing.time",
time.time() - start_submit,
)
except MessageRejected as e:
# If the processing strategy rejected our message, we need
# to pause the consumer and hold the message until it is
# accepted, at which point we can resume consuming.
# if not message_carried_over:
if self.__backpressure_timestamp is None:
self.__backpressure_timestamp = time.time()

elif not self.__is_paused and (
time.time() - self.__backpressure_timestamp > 1
):
logger.debug(
"Caught %r while submitting %r, pausing consumer...",
e,
self.__message,
)
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True

else:
time.sleep(0.01)

except InvalidMessage as e:
self._handle_invalid_message(e)

else:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False

# Clear backpressure timestamp if it is set
self._clear_backpressure()

self.__message = None

def signal_shutdown(self) -> None:
"""
Expand Down
45 changes: 25 additions & 20 deletions tests/processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.commit import IMMEDIATE, CommitPolicy
from arroyo.dlq import DlqPolicy, InvalidMessage
from arroyo.processing.processor import InvalidStateError, StreamProcessor
from arroyo.processing.processor import StreamProcessor
from arroyo.processing.strategies import Healthcheck
from arroyo.processing.strategies.abstract import (
MessageRejected,
Expand Down Expand Up @@ -61,18 +61,18 @@ def test_stream_processor_lifecycle() -> None:
offsets = {Partition(topic, 0): 0}
assignment_callback(offsets)

# If ``Consumer.poll`` doesn't return a message, we should poll the
# processing strategy, but not submit anything for processing.
# If ``Consumer.poll`` doesn't return a message, we should not have created
# or polled a strategy yet
consumer.poll.return_value = None
with assert_changes(
lambda: int(strategy.poll.call_count), 0, 1
lambda: int(strategy.poll.call_count), 0, 0
), assert_does_not_change(lambda: int(strategy.submit.call_count), 0):
processor._run_once()

# If ``Consumer.poll`` **does** return a message, we should poll the
# processing strategy and submit the message for processing.
consumer.poll.return_value = message.value
with assert_changes(lambda: int(strategy.poll.call_count), 1, 2), assert_changes(
with assert_changes(lambda: int(strategy.poll.call_count), 0, 1), assert_changes(
lambda: int(strategy.submit.call_count), 0, 1
):
processor._run_once()
Expand All @@ -98,47 +98,40 @@ def test_stream_processor_lifecycle() -> None:
processor._run_once()
assert strategy.submit.call_args_list[-1] == mock.call(message)

# Strategy should be closed and recreated if it already exists and
# we got another partition assigned.
# Strategy should be closed, but not created again.
with assert_changes(lambda: int(strategy.close.call_count), 0, 1):
assignment_callback({Partition(topic, 0): 0})

# Revocation should succeed with an active assignment, and cause the
# strategy instance to be closed.
consumer.tell.return_value = {}

with assert_changes(lambda: int(strategy.close.call_count), 1, 2):
# there should not be an active strategy between assigning and revoking, so
# closing should not happen.
with assert_changes(lambda: int(strategy.close.call_count), 1, 1):
revocation_callback([Partition(topic, 0)])

# Revocation should noop without an active assignment.
revocation_callback([Partition(topic, 0)])
revocation_callback([Partition(topic, 0)])

# The processor should not accept non-heartbeat messages without an
# assignment or active processor.
consumer.poll.return_value = message.value
with pytest.raises(InvalidStateError):
processor._run_once()

with assert_changes(lambda: int(consumer.close.call_count), 0, 1), assert_changes(
lambda: int(factory.shutdown.call_count), 0, 1
):
processor._shutdown()

assert list((type(call), call.name) for call in metrics.calls) == [
(Increment, "arroyo.consumer.partitions_assigned.count"),
(Timing, "arroyo.consumer.run.create_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Timing, "arroyo.consumer.run.create_strategy"),
(Timing, "arroyo.consumer.poll.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.processing.time"),
(Increment, "arroyo.consumer.run.count"),
(Increment, "arroyo.consumer.partitions_assigned.count"),
(Timing, "arroyo.consumer.run.close_strategy"),
(Timing, "arroyo.consumer.run.create_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Increment, "arroyo.consumer.partitions_revoked.count"),
(Timing, "arroyo.consumer.run.close_strategy"),
(Timing, "arroyo.consumer.run.callback"),
(Increment, "arroyo.consumer.partitions_revoked.count"),
(Timing, "arroyo.consumer.run.callback"),
Expand All @@ -149,7 +142,6 @@ def test_stream_processor_lifecycle() -> None:
(Timing, "arroyo.consumer.join.time"),
(Timing, "arroyo.consumer.shutdown.time"),
(Timing, "arroyo.consumer.callback.time"),
(Timing, "arroyo.consumer.poll.time"),
(Increment, "arroyo.consumer.run.count"),
]

Expand Down Expand Up @@ -278,10 +270,18 @@ def test_stream_processor_invalid_message_from_submit() -> None:
def test_stream_processor_create_with_partitions() -> None:
topic = Topic("topic")

partition = Partition(topic, 0)
offset = 0
now = datetime.now()
payload = 0

message = Message(BrokerValue(payload, partition, offset, now))

consumer = mock.Mock()
strategy = mock.Mock()
factory = mock.Mock()
factory.create_with_partitions.return_value = strategy
consumer.poll.return_value = message

with assert_changes(lambda: int(consumer.subscribe.call_count), 0, 1):
processor: StreamProcessor[int] = StreamProcessor(
Expand All @@ -296,25 +296,30 @@ def test_stream_processor_create_with_partitions() -> None:

# First partition assigned
offsets_p0 = {Partition(topic, 0): 0}
consumer.tell.return_value = offsets_p0
assignment_callback(offsets_p0)
processor._run_once()

create_args, _ = factory.create_with_partitions.call_args
assert factory.create_with_partitions.call_count == 1
assert create_args[1] == offsets_p0

# Second partition assigned
offsets_p1 = {Partition(topic, 1): 0}
consumer.tell.return_value = {**offsets_p0, **offsets_p1}
assignment_callback(offsets_p1)
processor._run_once()

create_args, _ = factory.create_with_partitions.call_args
assert factory.create_with_partitions.call_count == 2
assert create_args[1] == offsets_p1

assert create_args[1] == {**offsets_p0, **offsets_p1}
processor._run_once()

# First partition revoked
consumer.tell.return_value = {**offsets_p0, **offsets_p1}
revocation_callback([Partition(topic, 0)])
consumer.tell.return_value = offsets_p1
processor._run_once()

create_args, _ = factory.create_with_partitions.call_args
assert factory.create_with_partitions.call_count == 3
Expand Down
Loading