diff --git a/backend/apps/slack/admin.py b/backend/apps/slack/admin.py index 89ce76dd4b..44687f3dbc 100644 --- a/backend/apps/slack/admin.py +++ b/backend/apps/slack/admin.py @@ -90,6 +90,10 @@ class MemberAdmin(admin.ModelAdmin): actions = ("approve_suggested_users",) autocomplete_fields = ("user",) filter_horizontal = ("suggested_users",) + list_filter = ( + "is_bot", + "workspace", + ) search_fields = ( "slack_user_id", "username", @@ -129,15 +133,24 @@ def approve_suggested_users(self, request, queryset): class MessageAdmin(admin.ModelAdmin): - autocomplete_fields = ("author", "conversation", "parent_message") + autocomplete_fields = ( + "author", + "conversation", + "parent_message", + ) list_display = ( - "text", + "created_at", "has_replies", "author", + "conversation", + ) + list_filter = ( + "has_replies", + "conversation", ) search_fields = ( "slack_message_id", - "text", + "raw_data__text", ) diff --git a/backend/apps/slack/management/commands/slack_sync_messages.py b/backend/apps/slack/management/commands/slack_sync_messages.py index 01cf8aac67..473391e280 100644 --- a/backend/apps/slack/management/commands/slack_sync_messages.py +++ b/backend/apps/slack/management/commands/slack_sync_messages.py @@ -20,7 +20,7 @@ def add_arguments(self, parser): parser.add_argument( "--batch-size", type=int, - default=200, + default=999, help="Number of messages to retrieve per request", ) parser.add_argument( @@ -93,29 +93,18 @@ def _fetch_conversation( self.stdout.write(f"\nProcessing channel: {conversation.name}") try: - messages = self._fetch_messages( + self._fetch_messages( + batch_size=batch_size, client=client, conversation=conversation, - batch_size=batch_size, delay=delay, + include_replies=include_replies, max_retries=max_retries, ) - if include_replies: - for message in messages: - self._fetch_replies( - client=client, - conversation=conversation, - message=message, - delay=delay, - max_retries=max_retries, - ) - time.sleep(delay) - self.stdout.write( self.style.SUCCESS(f"Finished processing messages from {conversation.name}") ) - except SlackApiError as e: self.stdout.write( self.style.ERROR( @@ -125,64 +114,45 @@ def _fetch_conversation( def _fetch_messages( self, + batch_size: int, client: WebClient, conversation: Conversation, - batch_size: int, delay: float, max_retries: int, - ) -> list[Message]: + *, + include_replies: bool = True, + ) -> None: """Fetch all parent messages (non-thread) for a conversation.""" cursor = None has_more = True - batch_messages = [] - all_threaded_parents = [] - retry_count = 0 - - latest_message = ( - Message.objects.filter(conversation=conversation).order_by("-created_at").first() - ) - while has_more: try: + retry_count = 0 response = client.conversations_history( channel=conversation.slack_channel_id, cursor=cursor, limit=batch_size, - oldest=latest_message.created_at.timestamp() if latest_message else None, + oldest=latest_message.created_at.timestamp() + if (latest_message := conversation.latest_message) + else None, ) self._handle_slack_response(response, "conversations_history") - for message_data in response.get("messages", []): - if message_data.get("thread_ts") and message_data.get( - "ts" - ) != message_data.get("thread_ts"): - continue - - message = self._create_message_from_data( - client=client, - conversation=conversation, - delay=delay, - max_retries=max_retries, - message_data=message_data, + messages = [ + message + for message_data in response.get("messages", []) + if ( + message := self._create_message( + client=client, + conversation=conversation, + delay=delay, + max_retries=max_retries, + message_data=message_data, + ) ) - - if message: - batch_messages.append(message) - if message.has_replies: - all_threaded_parents.append(message) - - if batch_messages: - Message.bulk_save(batch_messages) - batch_messages = [] - + ] cursor = response.get("response_metadata", {}).get("next_cursor") has_more = bool(cursor) - - if delay and has_more: - time.sleep(delay) - - retry_count = 0 - except SlackApiError as e: if e.response["error"] == "ratelimited": if retry_count >= max_retries: @@ -200,12 +170,25 @@ def _fetch_messages( ) time.sleep(retry_after) continue + self.stdout.write( self.style.ERROR(f"Error fetching messages: {e.response['error']}") ) break - return all_threaded_parents + Message.bulk_save(messages.copy()) + if include_replies: + for message in messages: + if not message.has_replies: + continue + + self._fetch_replies( + client=client, + conversation=conversation, + message=message, + delay=delay, + max_retries=max_retries, + ) def _fetch_replies( self, @@ -216,38 +199,23 @@ def _fetch_replies( max_retries: int, ): """Fetch all thread replies for parent messages.""" - if not message: - return - - replies_to_save = [] - + replies = [] try: - latest_reply = ( - Message.objects.filter( - conversation=conversation, - parent_message=message, - ) - .order_by("-created_at") - .first() - ) - oldest_ts = latest_reply.created_at.timestamp() if latest_reply else None - cursor = None has_more = True - retry_count = 0 - while has_more: + retry_count = 0 try: params = { "channel": conversation.slack_channel_id, - "ts": message.slack_message_id, "cursor": cursor, - "limit": 100, - "inclusive": True, + "inclusive": False, + "limit": 1000, + "oldest": latest_reply.created_at.timestamp() + if (latest_reply := message.latest_reply) + else 0, + "ts": message.slack_message_id, } - if oldest_ts: - params["oldest"] = str(oldest_ts) - response = client.conversations_replies(**params) self._handle_slack_response(response, "conversations_replies") @@ -255,26 +223,23 @@ def _fetch_replies( if not messages_in_response: break - for reply_data in messages_in_response[1:]: - reply = self._create_message_from_data( - client=client, - message_data=reply_data, - conversation=conversation, - delay=delay, - max_retries=max_retries, - parent_message=message, + replies = [ + reply + for reply_data in messages_in_response + if ( + reply := self._create_message( + client=client, + conversation=conversation, + delay=delay, + max_retries=max_retries, + message_data=reply_data, + parent_message=message, + ) ) - if reply: - replies_to_save.append(reply) + ] cursor = response.get("response_metadata", {}).get("next_cursor") has_more = bool(cursor) - - if delay and has_more: - time.sleep(delay) - - retry_count = 0 - except SlackApiError as e: if e.response["error"] == "ratelimited": if retry_count >= max_retries: @@ -297,7 +262,6 @@ def _fetch_replies( time.sleep(retry_after) continue raise - except SlackApiError as e: self.stdout.write( self.style.ERROR( @@ -305,13 +269,9 @@ def _fetch_replies( ) ) - if replies_to_save: - batch_size = 1000 - for i in range(0, len(replies_to_save), batch_size): - batch = replies_to_save[i : i + batch_size] - Message.bulk_save(batch) + Message.bulk_save(replies) - def _create_message_from_data( + def _create_message( self, client: WebClient, message_data: dict, @@ -322,85 +282,67 @@ def _create_message_from_data( parent_message: Message | None = None, ) -> Message | None: """Create Message instance using from_slack pattern.""" - if message_data.get("subtype") in { - "channel_join", - "channel_leave", - "bot_message", - } or not any( - [ - message_data.get("text"), - message_data.get("attachments"), - message_data.get("files"), - message_data.get("blocks"), - ] - ): - return None - - try: - if not (slack_user_id := (message_data.get("user") or message_data.get("bot_id"))): - return None - + author = None + slack_user_id = message_data.get("user") or message_data.get("bot_id") + if slack_user_id: try: author = Member.objects.get( - slack_user_id=slack_user_id, workspace=conversation.workspace + slack_user_id=slack_user_id, + workspace=conversation.workspace, ) except Member.DoesNotExist: - author = None retry_count = 0 - while retry_count < max_retries: try: - time.sleep(delay) + # User. + if message_data.get("user"): + user_info = client.users_info(user=slack_user_id) + self._handle_slack_response(user_info, "users_info") - user_info = client.users_info(user=slack_user_id) - self._handle_slack_response(user_info, "users_info") - - author = Member.update_data( - user_info["user"], conversation.workspace, save=True - ) - self.stdout.write( - self.style.SUCCESS(f"Created new member: {slack_user_id}") - ) - break - except SlackApiError as e: - if e.response["error"] == "ratelimited": - retry_after = int( - e.response.headers.get("Retry-After", delay * (retry_count + 1)) + author = Member.update_data( + user_info["user"], conversation.workspace, save=True ) - - retry_count += 1 self.stdout.write( - self.style.WARNING( - f"Rate limited on user info. Retrying after {retry_after}s" - ) + self.style.SUCCESS(f"Created a new member: {slack_user_id}") + ) + # Bot. + else: + bot_info = client.bots_info(bot=slack_user_id) + self._handle_slack_response(bot_info, "bots_info") + bot_data = { + "id": slack_user_id, + "is_bot": True, + "name": bot_info["bot"].get("name"), + "real_name": bot_info["bot"].get("name"), + } + author = Member.update_data( + bot_data, conversation.workspace, save=True + ) + self.stdout.write( + self.style.SUCCESS(f"Created bot member: {slack_user_id}") ) + break + except SlackApiError as e: + if e.response.get("error") == "ratelimited": + retry_after = int(e.response.headers.get("Retry-After", delay)) + retry_count += 1 + self.stdout.write(self.style.WARNING("Rate limited on member info")) time.sleep(retry_after) else: self.stdout.write( - self.style.WARNING( - f"Failed to fetch user data for {slack_user_id}" + self.style.ERROR( + f"Failed to fetch member data for {slack_user_id}" ) ) - return None - - if not author: - self.stdout.write( - self.style.WARNING( - f"Could not fetch user {slack_user_id}, skipping message" - ) - ) - return None + break - return Message.update_data( - data=message_data, - conversation=conversation, - author=author, - parent_message=parent_message, - save=False, - ) - except Exception: - logger.exception("Error creating message from data") - return None + return Message.update_data( + data=message_data, + conversation=conversation, + author=author, + parent_message=parent_message, + save=False, + ) def _handle_slack_response(self, response, api_method): """Handle Slack API response and raise exception if needed.""" diff --git a/backend/apps/slack/migrations/0016_message_raw_data_alter_message_author.py b/backend/apps/slack/migrations/0016_message_raw_data_alter_message_author.py new file mode 100644 index 0000000000..7a51b03215 --- /dev/null +++ b/backend/apps/slack/migrations/0016_message_raw_data_alter_message_author.py @@ -0,0 +1,29 @@ +# Generated by Django 5.2.3 on 2025-06-18 18:38 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("slack", "0015_remove_message_is_thread_parent_message_has_replies_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="message", + name="raw_data", + field=models.JSONField(default=dict, verbose_name="Raw data"), + ), + migrations.AlterField( + model_name="message", + name="author", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="messages", + to="slack.member", + ), + ), + ] diff --git a/backend/apps/slack/migrations/0017_remove_message_text.py b/backend/apps/slack/migrations/0017_remove_message_text.py new file mode 100644 index 0000000000..9e65743dff --- /dev/null +++ b/backend/apps/slack/migrations/0017_remove_message_text.py @@ -0,0 +1,16 @@ +# Generated by Django 5.2.3 on 2025-06-21 02:12 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("slack", "0016_message_raw_data_alter_message_author"), + ] + + operations = [ + migrations.RemoveField( + model_name="message", + name="text", + ), + ] diff --git a/backend/apps/slack/models/conversation.py b/backend/apps/slack/models/conversation.py index 3f273adf90..1d779ce8e6 100644 --- a/backend/apps/slack/models/conversation.py +++ b/backend/apps/slack/models/conversation.py @@ -1,12 +1,16 @@ """Slack app conversation model.""" from datetime import UTC, datetime +from typing import TYPE_CHECKING from django.db import models from apps.common.models import BulkSaveModel, TimestampedModel from apps.slack.models.workspace import Workspace +if TYPE_CHECKING: # pragma: no cover + from apps.slack.models.message import Message + class Conversation(TimestampedModel): """Slack Conversation model.""" @@ -36,6 +40,11 @@ def __str__(self): """Channel human readable representation.""" return f"{self.name} - {self.workspace}" + @property + def latest_message(self) -> "Message | None": + """Get the latest message in the conversation.""" + return self.messages.order_by("-created_at").first() + def from_slack(self, conversation_data, workspace: Workspace) -> None: """Update instance based on Slack conversation data.""" self.created_at = datetime.fromtimestamp(int(conversation_data.get("created", 0)), tz=UTC) diff --git a/backend/apps/slack/models/member.py b/backend/apps/slack/models/member.py index 23fdf9a4ed..1246648748 100644 --- a/backend/apps/slack/models/member.py +++ b/backend/apps/slack/models/member.py @@ -49,7 +49,7 @@ def __str__(self): def from_slack(self, member_data, workspace: Workspace) -> None: """Update instance based on Slack member data.""" - self.email = member_data["profile"].get("email", "") + self.email = member_data.get("profile", {}).get("email", "") self.is_bot = member_data["is_bot"] self.real_name = member_data.get("real_name", "") self.slack_user_id = member_data["id"] diff --git a/backend/apps/slack/models/message.py b/backend/apps/slack/models/message.py index 9307355097..03cdc70201 100644 --- a/backend/apps/slack/models/message.py +++ b/backend/apps/slack/models/message.py @@ -20,11 +20,13 @@ class Meta: created_at = models.DateTimeField(verbose_name="Created at") has_replies = models.BooleanField(verbose_name="Has replies", default=False) + raw_data = models.JSONField(verbose_name="Raw data", default=dict) slack_message_id = models.CharField(verbose_name="Slack message ID", max_length=50) - text = models.TextField(verbose_name="Text") # FKs. - author = models.ForeignKey(Member, on_delete=models.CASCADE, related_name="messages") + author = models.ForeignKey( + Member, on_delete=models.CASCADE, related_name="messages", blank=True, null=True + ) conversation = models.ForeignKey( Conversation, on_delete=models.CASCADE, related_name="messages" ) @@ -38,21 +40,38 @@ class Meta: def __str__(self): """Human readable representation.""" - return truncate(self.text, 50) + return ( + f"{self.raw_data['channel']} huddle" + if self.raw_data.get("subtype") == "huddle_thread" + else truncate(self.raw_data["text"], 50) + ) + + @property + def latest_reply(self) -> "Message | None": + """Get the latest reply to this message.""" + return ( + Message.objects.filter( + conversation=self.conversation, + parent_message=self, + ) + .order_by("-created_at") + .first() + ) def from_slack( self, message_data: dict, conversation: Conversation, - author: Member, + author: "Member | None" = None, *, parent_message: "Message | None" = None, ) -> None: """Update instance based on Slack message data.""" self.created_at = datetime.fromtimestamp(float(message_data["ts"]), tz=UTC) self.has_replies = message_data.get("reply_count", 0) > 0 + self.is_bot = message_data.get("bot_id") is not None + self.raw_data = message_data self.slack_message_id = message_data.get("ts", "") - self.text = message_data.get("text", "") self.author = author self.conversation = conversation @@ -67,7 +86,7 @@ def bulk_save(messages: list["Message"], fields=None) -> None: def update_data( data: dict, conversation: Conversation, - author: Member, + author: Member | None = None, *, parent_message: "Message | None" = None, save: bool = True, diff --git a/backend/tests/slack/commands/management/slack_sync_messages_test.py b/backend/tests/slack/commands/management/slack_sync_messages_test.py index 0d19ca78d5..1a985ae8cf 100644 --- a/backend/tests/slack/commands/management/slack_sync_messages_test.py +++ b/backend/tests/slack/commands/management/slack_sync_messages_test.py @@ -231,69 +231,44 @@ def test_handle_successful_sync( assert "Processing channel: general" in output assert "Finished processing all workspaces" in output - def test_create_message_from_data_channel_join_subtype(self, command, mock_conversation): - """Test _create_message_from_data with channel_join subtype.""" + def test_create_message_no_user_or_bot(self, command, mock_conversation): + """Test _create_message when no user or bot_id is provided.""" message_data = { "ts": TEST_MESSAGE_TS, - "subtype": "channel_join", - "text": "User joined channel", - } - - mock_client = Mock() - result = command._create_message_from_data( - client=mock_client, - message_data=message_data, - conversation=mock_conversation, - delay=0.5, - max_retries=5, - parent_message=None, - ) - - assert result is None - - def test_create_message_from_data_no_content(self, command, mock_conversation): - """Test _create_message_from_data with no text, attachments, or files.""" - message_data = { - "ts": TEST_MESSAGE_TS, - "user": "U12345", + "text": "Hello world!", } mock_client = Mock() - result = command._create_message_from_data( - client=mock_client, - message_data=message_data, - conversation=mock_conversation, - delay=0.5, - max_retries=5, - parent_message=None, - ) - - assert result is None + mock_message = Mock(spec=Message) - def test_create_message_from_data_no_user(self, command, mock_conversation): - """Test _create_message_from_data with no user or bot_id.""" - message_data = { - "ts": TEST_MESSAGE_TS, - "text": "Hello world!", + mock_client.bots_info.return_value = { + "ok": True, + "bot": { + "id": "B12345", + "name": "TestBot", + }, } - mock_client = Mock() - result = command._create_message_from_data( - client=mock_client, - message_data=message_data, - conversation=mock_conversation, - delay=0.5, - max_retries=5, - parent_message=None, - ) + with ( + patch.object(Message, "update_data", return_value=mock_message), + patch.object(Member, "update_data", return_value=Mock(spec=Member)), + ): + result = command._create_message( + client=mock_client, + message_data=message_data, + conversation=mock_conversation, + delay=0.5, + max_retries=5, + parent_message=None, + ) - assert result is None + assert result is not None @patch("apps.slack.management.commands.slack_sync_messages.time.sleep") - def test_create_message_from_data_member_not_found( + def test_create_message_member_not_found( self, mock_sleep, command, mock_conversation, mock_user_info_response ): - """Test _create_message_from_data when member is not found.""" + """Test _create_message when member is not found.""" message_data = { "ts": TEST_MESSAGE_TS, "text": "Hello world!", @@ -302,15 +277,16 @@ def test_create_message_from_data_member_not_found( mock_client = Mock() mock_client.users_info.return_value = mock_user_info_response + mock_member = Mock(spec=Member) stdout = StringIO() with ( patch.object(Member.objects, "get", side_effect=Member.DoesNotExist), - patch.object(Member, "update_data", return_value=Mock(spec=Member)), + patch.object(Member, "update_data", return_value=mock_member), patch.object(Message, "update_data", return_value=Mock(spec=Message)), ): command.stdout = stdout - result = command._create_message_from_data( + result = command._create_message( client=mock_client, message_data=message_data, conversation=mock_conversation, @@ -321,13 +297,13 @@ def test_create_message_from_data_member_not_found( assert result is not None output = stdout.getvalue() - assert "Created new member: U12345" in output + assert "Created a new member: U12345" in output @patch("apps.slack.management.commands.slack_sync_messages.Message.update_data") - def test_create_message_from_data_regular_message( + def test_create_message_regular_message( self, mock_update_data, command, mock_conversation, mock_member ): - """Test _create_message_from_data with regular message.""" + """Test _create_message with regular message.""" message_data = { "ts": TEST_MESSAGE_TS, "text": "Hello world!", @@ -339,7 +315,7 @@ def test_create_message_from_data_regular_message( mock_client = Mock() with patch.object(Member.objects, "get", return_value=mock_member): - result = command._create_message_from_data( + result = command._create_message( client=mock_client, message_data=message_data, conversation=mock_conversation, @@ -418,7 +394,7 @@ def test_add_arguments(self, command): parser.add_argument.assert_any_call( "--batch-size", type=int, - default=200, + default=999, help="Number of messages to retrieve per request", ) diff --git a/backend/tests/slack/models/message_test.py b/backend/tests/slack/models/message_test.py index 3de025e261..851e1d6a0e 100644 --- a/backend/tests/slack/models/message_test.py +++ b/backend/tests/slack/models/message_test.py @@ -45,7 +45,6 @@ def test_update_data_new_message(self, mocker): assert result is not None assert isinstance(result, Message) assert result.slack_message_id == "123456.789" - assert result.text == "Test message" assert result.conversation == mock_conversation assert result.author == mock_author patched_message_save.assert_called_once() @@ -61,7 +60,6 @@ def test_update_data_existing_message(self, mocker): mock_message_instance = create_model_mock(Message) mock_message_instance.slack_message_id = "123456.789" - mock_message_instance.text = "Updated message" mocker.patch( "apps.slack.models.message.Message.objects.get", @@ -73,7 +71,6 @@ def test_update_data_existing_message(self, mocker): ) assert result is mock_message_instance - assert result.text == "Updated message" mock_message_instance.from_slack.assert_called_once_with( message_data, @@ -110,7 +107,6 @@ def test_update_data_no_save(self, mocker): assert result is not None assert isinstance(result, Message) assert result.slack_message_id == "123456.789" - assert result.text == "Test message" assert result.conversation == mock_conversation assert result.author == mock_author patched_save_method.assert_not_called() @@ -147,7 +143,6 @@ def test_update_data_with_thread_reply(self, mocker): assert result is not None assert isinstance(result, Message) assert result.slack_message_id == "123456.789" - assert result.text == "Reply message" assert result.parent_message == mock_parent assert not result.has_replies patched_message_save.assert_called_once() @@ -179,10 +174,9 @@ def test_update_data_with_thread_parent(self, mocker): assert result is not None assert isinstance(result, Message) assert result.slack_message_id == "123456.789" - assert result.text == "Parent message" assert result.has_replies patched_message_save.assert_called_once() def test_str_method(self): - message = Message(text="Short message") + message = Message(raw_data={"text": "Short message"}) assert str(message) == "Short message"