Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,21 @@ async def delete_crawls(
size += crawl_size

cid = crawl.cid
successful = crawl.state in SUCCESSFUL_STATES
if cid:
if cids_to_update.get(cid):
cids_to_update[cid]["inc"] += 1
cids_to_update[cid]["size"] += crawl_size
if successful:
cids_to_update[cid]["successful"] += 1
else:
cids_to_update[cid] = {}
cids_to_update[cid]["inc"] = 1
cids_to_update[cid]["size"] = crawl_size
if successful:
cids_to_update[cid]["successful"] = 1
else:
cids_to_update[cid]["successful"] = 0

if type_ == "crawl":
asyncio.create_task(
Expand Down Expand Up @@ -890,7 +897,10 @@ async def delete_crawls_all_types(
for cid, cid_dict in cids_to_update.items():
cid_size = cid_dict["size"]
cid_inc = cid_dict["inc"]
await self.crawl_configs.stats_recompute_last(cid, -cid_size, -cid_inc)
cid_successful = cid_dict["successful"]
await self.crawl_configs.stats_recompute_last(
cid, -cid_size, -cid_inc, -cid_successful
)

if uploads_length:
upload_delete_list = DeleteCrawlList(crawl_ids=uploads)
Expand Down
93 changes: 58 additions & 35 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import aiohttp
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response
import pymongo
from motor.motor_asyncio import AsyncIOMotorCollection

from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .models import (
Expand All @@ -46,7 +47,6 @@
PaginatedSeedResponse,
PaginatedConfigRevisionResponse,
SUCCESSFUL_STATES,
FAILED_STATES,
CrawlerChannel,
CrawlerChannels,
StartedResponse,
Expand Down Expand Up @@ -948,7 +948,9 @@ async def get_last_successful_crawl_out(

return None

async def stats_recompute_last(self, cid: UUID, size: int, inc_crawls: int = 1):
async def stats_recompute_last(
self, cid: UUID, size: int, inc_crawls: int = 1, inc_successful: int = 1
):
"""recompute stats by incrementing size counter and number of crawls"""
update_query: dict[str, object] = {}

Expand Down Expand Up @@ -1005,7 +1007,7 @@ async def stats_recompute_last(self, cid: UUID, size: int, inc_crawls: int = 1):
"$inc": {
"totalSize": size,
"crawlCount": inc_crawls,
"crawlSuccessfulCount": inc_crawls,
"crawlSuccessfulCount": inc_successful,
},
},
)
Expand Down Expand Up @@ -1542,7 +1544,12 @@ async def validate_custom_behavior(self, url: str) -> Dict[str, bool]:

# ============================================================================
# pylint: disable=too-many-locals
async def stats_recompute_all(crawl_configs, crawls, cid: UUID):
async def stats_recompute_all(
crawl_config_ops: CrawlConfigOps,
crawl_configs: AsyncIOMotorCollection,
crawls: AsyncIOMotorCollection,
cid: UUID,
) -> bool:
"""Re-calculate and update crawl statistics for config.

Should only be called when a crawl completes from operator or on migration
Expand All @@ -1552,15 +1559,16 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID):

match_query = {"cid": cid, "finished": {"$ne": None}}
count = await crawls.count_documents(match_query)
if count:
update_query["crawlCount"] = count

total_size = 0
successful_count = 0
update_query["crawlCount"] = count

last_crawl: Optional[dict[str, object]] = None
last_crawl_size = 0
total_size = 0
successful_count = 0

last_crawl: Optional[dict[str, object]] = None
last_crawl_size = 0

if count:
async for res in crawls.find(match_query).sort("finished", pymongo.ASCENDING):
files = res.get("files", [])
crawl_size = 0
Expand All @@ -1569,43 +1577,58 @@ async def stats_recompute_all(crawl_configs, crawls, cid: UUID):

total_size += crawl_size

if res["state"] not in FAILED_STATES:
if res["state"] in SUCCESSFUL_STATES:
successful_count += 1

last_crawl = res
last_crawl_size = crawl_size

# only update last_crawl if no crawls running, otherwise
# lastCrawl* stats are already for running crawl
running_crawl = await crawl_configs.get_running_crawl(cid)

if last_crawl and not running_crawl:
update_query["totalSize"] = total_size
update_query["crawlSuccessfulCount"] = successful_count

update_query["lastCrawlId"] = str(last_crawl.get("_id"))
update_query["lastCrawlStartTime"] = last_crawl.get("started")
update_query["lastStartedBy"] = last_crawl.get("userid")
update_query["lastStartedByName"] = last_crawl.get("userName")
update_query["lastCrawlState"] = last_crawl.get("state")
update_query["lastCrawlSize"] = last_crawl_size
update_query["lastCrawlStats"] = last_crawl.get("stats")
update_query["lastCrawlStopping"] = False
update_query["isCrawlRunning"] = False

last_crawl_finished = last_crawl.get("finished")
update_query["lastCrawlTime"] = last_crawl_finished

if last_crawl_finished:
update_query["lastRun"] = last_crawl_finished
# always update these
update_query["crawlSuccessfulCount"] = successful_count
update_query["totalSize"] = total_size

# only update last_crawl if no crawls running, otherwise
# lastCrawl* stats are already for running crawl
running_crawl = await crawl_config_ops.get_running_crawl(cid)

if last_crawl and not running_crawl:
update_query["lastCrawlId"] = str(last_crawl.get("_id"))
update_query["lastCrawlStartTime"] = last_crawl.get("started")
update_query["lastStartedBy"] = last_crawl.get("userid")
update_query["lastStartedByName"] = last_crawl.get("userName")
update_query["lastCrawlState"] = last_crawl.get("state")
update_query["lastCrawlSize"] = last_crawl_size
update_query["lastCrawlStats"] = last_crawl.get("stats")
update_query["lastCrawlStopping"] = False
update_query["isCrawlRunning"] = False

last_crawl_finished = last_crawl.get("finished")
update_query["lastCrawlTime"] = last_crawl_finished

if last_crawl_finished:
update_query["lastRun"] = last_crawl_finished

elif not last_crawl and not running_crawl:
# ensure all last crawl data is cleared
update_query["lastCrawlId"] = None
update_query["lastCrawlStartTime"] = None
update_query["lastStartedBy"] = None
update_query["lastStartedByName"] = None
update_query["lastCrawlTime"] = None
update_query["lastCrawlState"] = None
update_query["lastCrawlSize"] = 0
update_query["lastCrawlStats"] = None
update_query["lastCrawlStopping"] = False
update_query["isCrawlRunning"] = False
update_query["lastRun"] = None

result = await crawl_configs.find_one_and_update(
{"_id": cid, "inactive": {"$ne": True}},
{"$set": update_query},
return_document=pymongo.ReturnDocument.AFTER,
)

return result
return result is not None


# ============================================================================
Expand Down
9 changes: 7 additions & 2 deletions backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,10 @@ async def delete_crawls(
for cid, cid_dict in cids_to_update.items():
cid_size = cid_dict["size"]
cid_inc = cid_dict["inc"]
await self.crawl_configs.stats_recompute_last(cid, -cid_size, -cid_inc)
cid_successful = cid_dict["successful"]
await self.crawl_configs.stats_recompute_last(
cid, -cid_size, -cid_inc, -cid_successful
)

return count, cids_to_update, quota_reached

Expand Down Expand Up @@ -896,7 +899,9 @@ async def shutdown_crawl(
if not graceful:
await self.update_crawl_state(crawl_id, "canceled")
crawl = await self.get_crawl(crawl_id, org)
if not await self.crawl_configs.stats_recompute_last(crawl.cid, 0, -1):
if not await self.crawl_configs.stats_recompute_last(
crawl.cid, 0, -1, 0
):
raise HTTPException(
status_code=404,
detail=f"crawl_config_not_found: {crawl.cid}",
Expand Down
5 changes: 4 additions & 1 deletion backend/btrixcloud/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
) = object


CURR_DB_VERSION = "0054"
CURR_DB_VERSION = "0055"


# ============================================================================
Expand Down Expand Up @@ -128,6 +128,7 @@ async def update_and_prepare_db(
coll_ops,
file_ops,
crawl_log_ops,
crawl_config_ops,
crawl_manager,
):
await drop_indexes(mdb)
Expand Down Expand Up @@ -164,6 +165,7 @@ async def run_db_migrations(
coll_ops: CollectionOps,
file_ops: FileUploadOps,
crawl_log_ops: CrawlLogOps,
crawl_config_ops: CrawlConfigOps,
crawl_manager: CrawlManager,
):
"""Run database migrations."""
Expand Down Expand Up @@ -205,6 +207,7 @@ async def run_db_migrations(
coll_ops=coll_ops,
file_ops=file_ops,
crawl_log_ops=crawl_log_ops,
crawl_config_ops=crawl_config_ops,
crawl_manager=crawl_manager,
)
if await migration.run():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class Migration(BaseMigration):
def __init__(self, mdb, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

self.crawl_config_ops = kwargs.get("crawl_config_ops")

async def migrate_up(self):
"""Perform migration up.

Expand All @@ -26,10 +28,19 @@ async def migrate_up(self):
crawl_configs = self.mdb["crawl_configs"]
crawls = self.mdb["crawls"]

if self.crawl_config_ops is None:
print(
f"Unable to set run migration {MIGRATION_VERSION}, missing crawl_config_ops",
flush=True,
)
return

async for config in crawl_configs.find({"inactive": {"$ne": True}}):
config_id = config["_id"]
try:
await stats_recompute_all(crawl_configs, crawls, config_id)
await stats_recompute_all(
self.crawl_config_ops, crawl_configs, crawls, config_id
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Unable to update workflow {config_id}: {err}", flush=True)
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,28 @@ class Migration(BaseMigration):
def __init__(self, mdb, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

self.crawl_config_ops = kwargs.get("crawl_config_ops")

async def migrate_up(self):
"""Perform migration up."""
# pylint: disable=duplicate-code
crawl_configs = self.mdb["crawl_configs"]
crawls = self.mdb["crawls"]

if self.crawl_config_ops is None:
print(
f"Unable to set run migration {MIGRATION_VERSION}, missing crawl_config_ops",
flush=True,
)
return

# Update workflows crawl stats to populate crawlSuccessfulCount
async for config in crawl_configs.find({"inactive": {"$ne": True}}):
config_id = config["_id"]
try:
await stats_recompute_all(crawl_configs, crawls, config_id)
await stats_recompute_all(
self.crawl_config_ops, crawl_configs, crawls, config_id
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Unable to update workflow {config_id}: {err}", flush=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Migration 0055 - Recompute workflow crawl stats
"""

from motor.motor_asyncio import AsyncIOMotorDatabase

from btrixcloud.crawlconfigs import stats_recompute_all
from btrixcloud.migrations import BaseMigration


MIGRATION_VERSION = "0055"


class Migration(BaseMigration):
"""Migration class."""

# pylint: disable=unused-argument
def __init__(self, mdb: AsyncIOMotorDatabase, **kwargs):
super().__init__(mdb, migration_version=MIGRATION_VERSION)

self.crawl_config_ops = kwargs.get("crawl_config_ops")

async def migrate_up(self):
"""Perform migration up.

Recompute crawl workflow stats to fix issue with failed crawls
being added to successfulCrawlCount and workflow size totals.
"""
# pylint: disable=duplicate-code
crawl_configs = self.mdb["crawl_configs"]
crawls = self.mdb["crawls"]

if self.crawl_config_ops is None:
print(
f"Unable to set run migration {MIGRATION_VERSION}, missing crawl_config_ops",
flush=True,
)
return

async for config in crawl_configs.find({"inactive": {"$ne": True}}):
config_id = config["_id"]
try:
await stats_recompute_all(
self.crawl_config_ops, crawl_configs, crawls, config_id
)
# pylint: disable=broad-exception-caught
except Exception as err:
print(f"Unable to update workflow {config_id}: {err}", flush=True)
8 changes: 4 additions & 4 deletions backend/btrixcloud/operator/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1753,11 +1753,10 @@ async def do_crawl_finished_tasks(
stats: Optional[OpCrawlStats],
) -> None:
"""Run tasks after crawl completes in asyncio.task coroutine."""
await self.crawl_config_ops.stats_recompute_last(
crawl.cid, status.filesAddedSize, 1
)

if state in SUCCESSFUL_STATES and crawl.oid:
await self.crawl_config_ops.stats_recompute_last(
crawl.cid, status.filesAddedSize, 1, 1
)
await self.page_ops.set_archived_item_page_counts(crawl.id)
await self.org_ops.set_last_crawl_finished(crawl.oid)
await self.coll_ops.add_successful_crawl_to_collections(
Expand All @@ -1774,6 +1773,7 @@ async def do_crawl_finished_tasks(
)

if state in FAILED_STATES:
await self.crawl_config_ops.stats_recompute_last(crawl.cid, 0, 1, 0)
await self.crawl_ops.delete_failed_crawl_files(crawl.id, crawl.oid)
await self.page_ops.delete_crawl_pages(crawl.id, crawl.oid)

Expand Down
Loading