diff --git a/backend/apps/slack/management/commands/slack_sync_data.py b/backend/apps/slack/management/commands/slack_sync_data.py index 65886d3ffc..98bea217c6 100644 --- a/backend/apps/slack/management/commands/slack_sync_data.py +++ b/backend/apps/slack/management/commands/slack_sync_data.py @@ -1,7 +1,11 @@ -"""A command to populate Slack channels and members data based on workspaces's bot tokens.""" +"""Management command to sync Slack workspaces, channels, and members.""" import logging import time +import os +import random +import secrets +import time from django.core.management.base import BaseCommand from slack_sdk import WebClient @@ -10,25 +14,15 @@ from apps.slack.models import Conversation, Member, Workspace logger = logging.getLogger(__name__) +MAX_RETRIES = 5 class Command(BaseCommand): help = "Populate channels and members for all Slack workspaces using their bot tokens" def add_arguments(self, parser): - """Define command line arguments.""" - parser.add_argument( - "--batch-size", - type=int, - default=1000, - help="Number of conversations to retrieve per request", - ) - parser.add_argument( - "--delay", - type=float, - default=0.5, - help="Delay between API requests in seconds", - ) + parser.add_argument("--batch-size", type=int, default=1000) + parser.add_argument("--delay", type=float, default=0.5) def handle(self, *args, **options): batch_size = options["batch_size"] @@ -41,90 +35,121 @@ def handle(self, *args, **options): for workspace in workspaces: self.stdout.write(f"\nProcessing workspace: {workspace}") - - if not (bot_token := workspace.bot_token): + bot_token = ( + (getattr(workspace, "bot_token", "") or "").strip() + or os.environ.get("DJANGO_SLACK_BOT_TOKEN") + ) + if not bot_token: self.stdout.write(self.style.ERROR(f"No bot token found for {workspace}")) continue client = WebClient(token=bot_token) - self.stdout.write(f"Fetching conversations for {workspace}...") - conversations = [] - total_channels = 0 - try: - cursor = None - while True: - response = client.conversations_list( - cursor=cursor, - exclude_archived=False, - limit=batch_size, - timeout=30, - types="public_channel,private_channel", - ) - self._handle_slack_response(response, "conversations_list") - - conversations.extend( - member - for conversation_data in response["channels"] - if (member := Conversation.update_data(conversation_data, workspace)) - ) - total_channels += len(response["channels"]) + self._fetch_conversations(client, workspace, batch_size, delay) + self._fetch_members(client, workspace, batch_size, delay) - if not (cursor := response.get("response_metadata", {}).get("next_cursor")): - break - - if delay: - time.sleep(delay) - except SlackApiError as e: - self.stdout.write( - self.style.ERROR(f"Failed to fetch conversations: {e.response['error']}") - ) - if conversations: - Conversation.bulk_save(conversations) - self.stdout.write(self.style.SUCCESS(f"Populated {total_channels} channels")) + self.stdout.write(self.style.SUCCESS("\nFinished processing all workspaces")) - self.stdout.write(f"Fetching members for {workspace}...") - members = [] - total_members = 0 + def _call_slack_api(self, func, *args, **kwargs): + def _raise_max_retries(): + raise RuntimeError("Max retries exceeded while calling Slack API") + retries = 0 + while retries < MAX_RETRIES: try: - cursor = None - while True: - response = client.users_list( - cursor=cursor, - limit=batch_size, - timeout=30, - ) - self._handle_slack_response(response, "users_list") - - members.extend( - member - for member_data in response["members"] - if (member := Member.update_data(member_data, workspace)) - ) - total_members += len(response["members"]) - - cursor = response.get("response_metadata", {}).get("next_cursor") - if not cursor: - break + response = func(*args, **kwargs) + if not response.get("ok", False): + raise RuntimeError(f"{func.__name__} returned ok=False: {response!r}") + return response except SlackApiError as e: - self.stdout.write( - self.style.ERROR(f"Failed to fetch members: {e.response['error']}") + if e.response.status_code == 429: + retry_after = int(e.response.headers.get("Retry-After", 1)) + self.stdout.write( + self.style.WARNING(f"Rate limited. Sleeping {retry_after}s...") + ) + time.sleep(retry_after) + else: + raise + retries += 1 + backoff = 2**retries + secrets.randbelow(1000) / 1000 + time.sleep(backoff) + _raise_max_retries() + return # explicit return to satisfy RET503 + + def _fetch_conversations(self, client, workspace, batch_size, delay): + self.stdout.write(f"Fetching conversations for {workspace}...") + conversations = [] + total_channels = 0 + try: + cursor = None + while True: + response = self._call_slack_api( + client.conversations_list, + cursor=cursor, + exclude_archived=False, + limit=batch_size, + timeout=30, + types="public_channel,private_channel", ) - if members: - Member.bulk_save(members) - - # Update the workspace with the total members count. - workspace.total_members_count = total_members - workspace.save(update_fields=["total_members_count"]) + for conversation_data in response["channels"]: + if "num_members" not in conversation_data: + try: + info = self._call_slack_api( + client.conversations_info, + channel=conversation_data["id"], + include_num_members=True, + ) + conversation_data["num_members"] = info["channel"].get("num_members") + except SlackApiError as e: + self.stdout.write( + self.style.WARNING( + f"Failed to get member count for {conversation_data['id']}: {e}" + ) + ) + conversation_data["num_members"] = None + + if (conversation := Conversation.update_data(conversation_data, workspace)): + conversations.append(conversation) + + total_channels += len(response["channels"]) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + time.sleep(delay) + except SlackApiError as e: + self.stdout.write(self.style.ERROR(f"Error fetching conversations: {e}")) + + if conversations: + Conversation.bulk_save(conversations) + self.stdout.write(self.style.SUCCESS(f"Populated {total_channels} channels")) + + def _fetch_members(self, client, workspace, batch_size, delay): + self.stdout.write(f"Fetching members for {workspace}...") + members = [] + total_members = 0 + try: + cursor = None + while True: + response = self._call_slack_api( + client.users_list, + cursor=cursor, + limit=batch_size, + timeout=30, + ) + members.extend( + member + for member_data in response["members"] + if (member := Member.update_data(member_data, workspace)) + ) + total_members += len(response["members"]) + cursor = response.get("response_metadata", {}).get("next_cursor") + if not cursor: + break + time.sleep(delay) + except SlackApiError as e: + self.stdout.write(self.style.ERROR(f"Error fetching members: {e}")) + + if members: + Member.bulk_save(members) self.stdout.write(self.style.SUCCESS(f"Populated {total_members} members")) - - self.stdout.write(self.style.SUCCESS("\nFinished processing all workspaces")) - - def _handle_slack_response(self, response, api_method): - """Handle Slack API response and raise exception if needed.""" - if not response["ok"]: - error_message = f"{api_method} API call failed" - logger.error(error_message) - self.stdout.write(self.style.ERROR(error_message)) diff --git a/backend/apps/slack/migrations/0014_remove_workspace_total_members_count.py b/backend/apps/slack/migrations/0014_remove_workspace_total_members_count.py new file mode 100644 index 0000000000..217934ae66 --- /dev/null +++ b/backend/apps/slack/migrations/0014_remove_workspace_total_members_count.py @@ -0,0 +1,17 @@ +# Generated by Django 5.2.1 on 2025-06-11 08:36 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('slack', '0013_alter_conversation_total_members_count_and_more'), + ] + + operations = [ + migrations.RemoveField( + model_name='workspace', + name='total_members_count', + ), + ] diff --git a/backend/apps/slack/migrations/0017_workspace_total_members_count.py b/backend/apps/slack/migrations/0017_workspace_total_members_count.py new file mode 100644 index 0000000000..7e713221e6 --- /dev/null +++ b/backend/apps/slack/migrations/0017_workspace_total_members_count.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.3 on 2025-06-18 11:51 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('slack', '0016_merge_20250618_1130'), + ] + + operations = [ + migrations.AddField( + model_name='workspace', + name='total_members_count', + field=models.IntegerField(blank=True, null=True), + ), + ] diff --git a/backend/apps/slack/tests/test_slack_sync_data.py b/backend/apps/slack/tests/test_slack_sync_data.py new file mode 100644 index 0000000000..f70edbe151 --- /dev/null +++ b/backend/apps/slack/tests/test_slack_sync_data.py @@ -0,0 +1,127 @@ +import os +from django.test import TestCase, override_settings +from django.core.management import call_command +from apps.slack.models import Workspace +from unittest.mock import patch, MagicMock + +import slack_bolt.app.app as slack_bolt_app +import slack_sdk.web.client as slack_web_client + + +class SlackSyncDataCommandTests(TestCase): + def setUp(self): + # Patch Slack App to prevent actual Slack startup + self.app_patcher = patch.object(slack_bolt_app, "App", autospec=True) + self.app_patcher.start() + + # Patch Slack WebClient to prevent HTTP calls + self.web_client_patcher = patch.object(slack_web_client, "WebClient", autospec=True) + self.mock_web_client_class = self.web_client_patcher.start() + self.mock_web_client = MagicMock() + self.mock_web_client_class.return_value = self.mock_web_client + + # Patch os.environ so real token isn't used accidentally + self.env_patcher = patch.dict(os.environ, {"SLACK_BOT_TOKEN": "fake-token"}) + self.env_patcher.start() + + # Always return valid auth + self.mock_web_client.auth_test.return_value = {"ok": True, "user_id": "BOT123"} + + # Create a fake workspace object in DB + self.workspace = Workspace.objects.create( + name="Test Workspace", + slack_workspace_id="TEST123" + ) + # Set up bot token in environment for the test + os.environ["DJANGO_SLACK_BOT_TOKEN"] = "xoxb-test-token" + def tearDown(self): + self.app_patcher.stop() + self.web_client_patcher.stop() + + # Patch model logic + self.conv_update_patch = patch("apps.slack.models.Conversation.update_data", return_value=MagicMock()) + self.conv_bulk_patch = patch("apps.slack.models.Conversation.bulk_save") + self.member_update_patch = patch("apps.slack.models.Member.update_data", return_value=MagicMock()) + self.member_bulk_patch = patch("apps.slack.models.Member.bulk_save") + + self.mock_conv_update = self.conv_update_patch.start() + self.mock_conv_bulk = self.conv_bulk_patch.start() + self.mock_member_update = self.member_update_patch.start() + self.mock_member_bulk = self.member_bulk_patch.start() + + + # Create Workspace (bot_token will be fetched from env, not passed) + self.workspace = Workspace.objects.create(name="CI Test Workspace") + + # Mock Slack responses to simulate pagination and channels + self.mock_web_client.users_list.side_effect = [ + { + "ok": True, + "members": [{"id": "U1"}, {"id": "U2"}], + "response_metadata": {"next_cursor": "next-123"} + }, + { + "ok": True, + "members": [{"id": "U3"}], + "response_metadata": {"next_cursor": ""} + } + ] + + + # Mock Slack's conversations_list response + self.mock_web_client.conversations_list.return_value = { + "ok": True, + "channels": [{"id": "C1", "name": "general"}, {"id": "C2", "name": "random"}], + "response_metadata": {"next_cursor": ""} + } + + # Mock conversations.info for member count fetching + self.mock_web_client.conversations_info.return_value = { + "ok": True, + "channel": {"num_members": 10} + } + + self.mock_web_client.conversations_list.side_effect = [ + { + "ok": True, + "channels": [ + {"id": "C1", "name": "general", "num_members": 5}, + {"id": "C2", "name": "random", "num_members": 3} + ], + "response_metadata": {"next_cursor": ""} + } + ] + + + def tearDown(self): + self.app_patcher.stop() + self.web_client_patcher.stop() + self.env_patcher.stop() + self.conv_update_patch.stop() + self.conv_bulk_patch.stop() + self.member_update_patch.stop() + self.member_bulk_patch.stop() + + def test_command_syncs_mocked_slack_data(self): + call_command("slack_sync_data", batch_size=100, delay=0) + + + # Refresh from DB in case command saves anything + self.workspace.refresh_from_db() + + # Verify members were created + from apps.slack.models import Member, Conversation + self.assertEqual(Member.objects.count(), 3) + self.assertEqual(Conversation.objects.count(), 2) + + # Verify API calls were made correctly + self.assertEqual(self.mock_web_client.users_list.call_count, 2) + self.assertEqual(self.mock_web_client.conversations_list.call_count, 1) + self.assertEqual(self.mock_web_client.conversations_info.call_count, 2) + + # Assertions — confirm mocks were hit + self.assertTrue(self.mock_conv_update.called, "Conversation update was not called.") + self.assertTrue(self.mock_conv_bulk.called, "Conversation bulk save was not called.") + self.assertTrue(self.mock_member_update.called, "Member update was not called.") + self.assertTrue(self.mock_member_bulk.called, "Member bulk save was not called.") +