diff --git a/hathor/event/event_manager.py b/hathor/event/event_manager.py index d93c0b8af..d8c0a10a4 100644 --- a/hathor/event/event_manager.py +++ b/hathor/event/event_manager.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Iterable, Iterator, Optional +from typing import Callable, Iterator, Optional from structlog import get_logger @@ -24,9 +24,12 @@ from hathor.pubsub import EventArguments, HathorEvents, PubSubManager from hathor.transaction import BaseTransaction from hathor.util import Reactor, progress +from hathor.utils.iter import batch_iterator logger = get_logger() +N_LOAD_EVENTS_PER_BATCH = 10_000 + _GROUP_START_EVENTS = { EventType.REORG_STARTED, } @@ -258,20 +261,30 @@ def handle_load_phase_vertices( if not self._should_reload_events(): return - def create_event_batch() -> Iterable[BaseEvent]: - assert self._event_ws_factory is not None - self.log.info('Starting creating events from existing database...') + self.log.info('Started creating events from existing database...') + event_iterator = self._create_event_iterator(topological_iterator, total_vertices) + event_batches = batch_iterator(event_iterator, N_LOAD_EVENTS_PER_BATCH) + + for batch in event_batches: + self._event_storage.save_events(batch) - for vertex in progress(topological_iterator, log=self.log, total=total_vertices): - event = self._handle_event_creation( - event_type=EventType.NEW_VERTEX_ACCEPTED, - event_args=EventArguments(tx=vertex) - ) + self.log.info('Finished creating events from existing database.') - yield event - self._event_ws_factory.broadcast_event(event) - self._last_event = event + def _create_event_iterator( + self, + topological_iterator: Iterator[BaseTransaction], + total_vertices: int + ) -> Iterator[BaseEvent]: + """Given a topological iterator of txs, create an iterator of events while also tracking progress and + broadcasting them.""" + assert self._event_ws_factory is not None - self.log.info('Finished creating events from existing database.') + for vertex in progress(topological_iterator, log=self.log, total=total_vertices): + event = self._handle_event_creation( + event_type=EventType.NEW_VERTEX_ACCEPTED, + event_args=EventArguments(tx=vertex) + ) - self._event_storage.save_events(create_event_batch()) + yield event + self._event_ws_factory.broadcast_event(event) + self._last_event = event diff --git a/hathor/utils/iter.py b/hathor/utils/iter.py new file mode 100644 index 000000000..73139b979 --- /dev/null +++ b/hathor/utils/iter.py @@ -0,0 +1,44 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Iterable, TypeVar + +T = TypeVar('T') + + +def batch_iterator(iterator: Iterable[T], batch_size: int) -> Iterable[list[T]]: + """ + Yield batches of up to batch_size items from iterator. + + >>> list(batch_iterator([], 10)) + [] + >>> list(batch_iterator([1, 2, 3, 4], 1)) + [[1], [2], [3], [4]] + >>> list(batch_iterator([1, 2, 3, 4], 2)) + [[1, 2], [3, 4]] + >>> list(batch_iterator([1, 2, 3, 4], 3)) + [[1, 2, 3], [4]] + >>> list(batch_iterator([1, 2, 3, 4], 4)) + [[1, 2, 3, 4]] + """ + assert batch_size >= 1 + batch = [] + for item in iterator: + batch.append(item) + if len(batch) >= batch_size: + yield batch + batch = [] + + if batch: + yield batch