diff --git a/npiai/tools/web/scraper/app.py b/npiai/tools/web/scraper/app.py index 91a7b6a..5372dc9 100644 --- a/npiai/tools/web/scraper/app.py +++ b/npiai/tools/web/scraper/app.py @@ -133,6 +133,7 @@ async def summarize_stream( batch_index = 0 results_queue: asyncio.Queue[SummaryChunk] = asyncio.Queue() + lock = asyncio.Lock() skip_item_hashes_set = set(skip_item_hashes) if skip_item_hashes else None @@ -142,14 +143,17 @@ async def run_batch(): if limit != -1 and remaining <= 0: return - current_index = batch_index - batch_index += 1 + async with lock: + current_index = batch_index + batch_index += 1 - # calculate the number of items to summarize in the current batch - requested_count = min(self._batch_size, remaining) if limit != -1 else -1 - # reduce the remaining count by the number of items in the current batch - # so that the other tasks will not exceed the limit - remaining -= requested_count + # calculate the number of items to summarize in the current batch + requested_count = ( + min(self._batch_size, remaining) if limit != -1 else -1 + ) + # reduce the remaining count by the number of items in the current batch + # so that the other tasks will not exceed the limit + remaining -= requested_count parsed_result = await self._convert( ancestor_selector=ancestor_selector, @@ -179,12 +183,13 @@ async def run_batch(): await ctx.send_debug_message(f"[{self.name}] No items summarized") return - items_slice = items[:requested_count] if limit != -1 else items - summarized_count = len(items_slice) - count += summarized_count - # correct the remaining count in case summary returned fewer items than requested - if summarized_count < requested_count: - remaining += requested_count - summarized_count + async with lock: + items_slice = items[:requested_count] if limit != -1 else items + summarized_count = len(items_slice) + count += summarized_count + # recalculate the remaining count in case summary returned fewer items than requested + if summarized_count < requested_count: + remaining += requested_count - summarized_count await results_queue.put( {