diff --git a/requirements.pip b/requirements.pip index e4a815b..dc8ff62 100644 --- a/requirements.pip +++ b/requirements.pip @@ -74,4 +74,5 @@ eth-tester PyJWT djangorestframework-simplejwt django-ninja -numpy \ No newline at end of file +numpy +celery-singleton \ No newline at end of file diff --git a/stats-backend/api2/migrations/0037_alter_node_version.py b/stats-backend/api2/migrations/0037_alter_node_version.py new file mode 100644 index 0000000..e8cd7c7 --- /dev/null +++ b/stats-backend/api2/migrations/0037_alter_node_version.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.7 on 2024-10-03 13:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('api2', '0036_nodestatushistory_remove_duplicate'), + ] + + operations = [ + migrations.AlterField( + model_name='node', + name='version', + field=models.CharField(blank=True, db_index=True, max_length=7, null=True), + ), + ] diff --git a/stats-backend/api2/migrations/0038_node_type.py b/stats-backend/api2/migrations/0038_node_type.py new file mode 100644 index 0000000..18d75ef --- /dev/null +++ b/stats-backend/api2/migrations/0038_node_type.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.7 on 2024-10-03 14:09 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('api2', '0037_alter_node_version'), + ] + + operations = [ + migrations.AddField( + model_name='node', + name='type', + field=models.CharField(db_index=True, default='provider', max_length=42), + ), + ] diff --git a/stats-backend/api2/models.py b/stats-backend/api2/models.py index 7fe4a5c..22f630c 100644 --- a/stats-backend/api2/models.py +++ b/stats-backend/api2/models.py @@ -12,11 +12,12 @@ class Node(models.Model): online = models.BooleanField(default=False, db_index=True) earnings_total = models.FloatField(null=True, blank=True) computing_now = models.BooleanField(default=False, db_index=True) - version = models.CharField(max_length=7, db_index=True) + version = models.CharField(max_length=7, db_index=True, null=True, blank=True) updated_at = models.DateTimeField(auto_now=True, db_index=True) created_at = models.DateTimeField(auto_now_add=True, db_index=True) uptime_created_at = models.DateTimeField(auto_now_add=True, db_index=True) network = models.CharField(max_length=42, default="mainnet", db_index=True) + type = models.CharField(max_length=42, default="provider", db_index=True) def save(self, *args, **kwargs): if not self.online: diff --git a/stats-backend/api2/scanner.py b/stats-backend/api2/scanner.py index b9cb2f8..cdf6633 100644 --- a/stats-backend/api2/scanner.py +++ b/stats-backend/api2/scanner.py @@ -57,7 +57,7 @@ def update_providers_info(node_props): new_provider_ids = set(provider_ids) - existing_provider_ids # Create new Node instances if any - new_nodes = [Node(node_id=provider_id) for provider_id in new_provider_ids] + new_nodes = [Node(node_id=provider_id, type="provider") for provider_id in new_provider_ids] if new_nodes: Node.objects.bulk_create(new_nodes) @@ -174,9 +174,10 @@ def update_providers_info(node_props): node = existing_nodes_dict[provider_id] node.wallet = data.get("wallet") node.network = data.get('network', 'mainnet') + node.type = "provider" nodes_to_update.append(node) if nodes_to_update: - Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at']) + Node.objects.bulk_update(nodes_to_update, ['wallet', 'network', 'updated_at', 'type']) print(f"Done updating {len(provider_ids)} providers") diff --git a/stats-backend/api2/tasks.py b/stats-backend/api2/tasks.py index 0e53c65..aa0aca1 100644 --- a/stats-backend/api2/tasks.py +++ b/stats-backend/api2/tasks.py @@ -1825,6 +1825,7 @@ def extract_wallets_and_ids(): from django.db import transaction from django.db.models import Case, When, Value, BooleanField from .models import NodeStatusHistory, Node +from django.db import IntegrityError @app.task def bulk_update_node_statuses(nodes_data): @@ -1836,6 +1837,25 @@ def bulk_update_node_statuses(nodes_data): latest_status = r.get(f"provider:{node_id}:status") if latest_status is None or latest_status.decode() != str(is_online): + try: + node, created = Node.objects.get_or_create( + node_id=node_id, + defaults={'online': is_online} + ) + if created: + node.type = "requestor" + node.save() + except IntegrityError: + # If creation fails due to race condition, try to get the object + try: + node = Node.objects.get(node_id=node_id) + except Node.DoesNotExist: + print(f"Node {node_id} not found") + # If still not found, create a new object + node = Node(node_id=node_id, online=is_online, type="requestor") + node.save() + node.online = is_online + node.save() status_history_to_create.append( NodeStatusHistory(node_id=node_id, is_online=is_online) ) @@ -1859,10 +1879,52 @@ def bulk_update_node_statuses(nodes_data): from .utils import check_node_status +import aiohttp +import asyncio +import json +from celery.utils.log import get_task_logger +from celery_singleton import Singleton + + +@app.task(base=Singleton, bind=True, max_retries=None) +def listen_for_relay_events(self): + try: + asyncio.run(event_listener()) + except Exception as exc: + print(f"listen_for_relay_events task failed: {exc}") + self.retry(countdown=5, exc=exc) # Retry after 5 seconds + +async def event_listener(): + url = "http://yacn2.dev.golem.network:9000/events" + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + async for line in resp.content: + if line: + try: + decoded_line = line.decode('utf-8').strip() + if decoded_line.startswith('event:'): + event_type = decoded_line.split(':', 1)[1].strip() + elif decoded_line.startswith('data:'): + node_id = decoded_line.split(':', 1)[1].strip() + event = {'Type': event_type, 'Id': node_id} + process_event(event) + except Exception as e: + print(f"Failed to process event: {e}") + +def process_event(event): + event_type = event.get('Type') + node_id = event.get('Id') + + if event_type == 'new-node': + print(f"New node: {node_id}") + bulk_update_node_statuses.delay([(node_id, True)]) + elif event_type == 'lost-node': + print(f"Lost node: {node_id}") + bulk_update_node_statuses.delay([(node_id, False)]) + @app.task -def fetch_and_update_relay_nodes_online_status(): +def initial_relay_nodes_scan(): base_url = "http://yacn2.dev.golem.network:9000/nodes/" - current_online_nodes = set() nodes_to_update = [] for prefix in range(256): @@ -1874,23 +1936,14 @@ def fetch_and_update_relay_nodes_online_status(): for node_id, sessions in data.items(): node_id = node_id.strip().lower() is_online = bool(sessions) and any('seen' in item for item in sessions if item) - current_online_nodes.add(node_id) nodes_to_update.append((node_id, is_online)) except requests.RequestException as e: print(f"Error fetching data for prefix {prefix:02x}: {e}") - # Bulk update node statuses bulk_update_node_statuses.delay(nodes_to_update) + listen_for_relay_events.delay() - # Check providers that were previously online but not found in the current scan - previously_online = set(NodeStatusHistory.objects.filter( - is_online=True - ).order_by('node_id', '-timestamp').distinct('node_id').values_list('node_id', flat=True)) - - missing_nodes = previously_online - current_online_nodes - if missing_nodes: - check_missing_nodes.delay(list(missing_nodes)) @app.task diff --git a/stats-backend/core/celery.py b/stats-backend/core/celery.py index c685939..ba00004 100644 --- a/stats-backend/core/celery.py +++ b/stats-backend/core/celery.py @@ -5,6 +5,7 @@ from celery.schedules import crontab + logger = logging.getLogger("Celery") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings") @@ -74,17 +75,11 @@ def setup_periodic_tasks(sender, **kwargs): daily_volume_golem_vs_chain, computing_total_over_time, extract_wallets_and_ids, - fetch_and_update_relay_nodes_online_status, + initial_relay_nodes_scan, ) v2_offer_scraper.apply_async(args=["ray-on-golem-heads"], queue="yagna", routing_key="yagna") v2_offer_scraper.apply_async(queue="yagna", routing_key="yagna") - - sender.add_periodic_task( - 45, - fetch_and_update_relay_nodes_online_status.s(), - queue="default", - options={"queue": "default", "routing_key": "default"}, - ) + initial_relay_nodes_scan.delay() sender.add_periodic_task( 60, computing_total_over_time.s(), @@ -491,4 +486,4 @@ def setup_periodic_tasks(sender, **kwargs): "app.tasks.default": {"queue": "default"}, "app.tasks.yagna": {"queue": "yagna"}, } -app.conf.broker_connection_retry_on_startup = True +app.conf.broker_connection_retry_on_startup = True \ No newline at end of file