diff --git a/backend/src/app.module.ts b/backend/src/app.module.ts index 3e3b4f1..0a20b98 100644 --- a/backend/src/app.module.ts +++ b/backend/src/app.module.ts @@ -18,6 +18,7 @@ import { AdminModule } from './modules/admin/admin.module'; import { AnalyticsModule } from './modules/analytics/analytics.module'; import { MCPModule } from './modules/mcp/mcp.module'; import { McpSlackModule } from './modules/mcp-slack/mcp-slack.module'; +import { McpTeamsModule } from './modules/mcp-teams/mcp-teams.module'; @Module({ imports: [ @@ -57,6 +58,7 @@ import { McpSlackModule } from './modules/mcp-slack/mcp-slack.module'; AnalyticsModule, MCPModule, McpSlackModule, + McpTeamsModule, ], controllers: [AppController], providers: [AppService], diff --git a/backend/src/modules/mcp-teams/controllers/mcp-teams.controller.ts b/backend/src/modules/mcp-teams/controllers/mcp-teams.controller.ts new file mode 100644 index 0000000..7093f18 --- /dev/null +++ b/backend/src/modules/mcp-teams/controllers/mcp-teams.controller.ts @@ -0,0 +1,347 @@ +import { + Controller, + Get, + Post, + Body, + Query, + Param, + HttpException, + HttpStatus, +} from '@nestjs/common'; +import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; +import { McpTeamsService } from '../services/mcp-teams.service'; +import { + SyncChannelDto, + GetMessagesDto, + GetMeetingsDto, + ProcessRecordingDto, + SyncRecordingsDto, + GetTeamMembersDto, +} from '../dto/teams-mcp.dto'; + +/** + * Teams MCP Controller + * REST API endpoints for Microsoft Teams MCP integration + */ +@ApiTags('teams-mcp') +@Controller('teams-mcp') +export class McpTeamsController { + constructor(private readonly mcpTeamsService: McpTeamsService) {} + + /** + * Check if Teams MCP is available + */ + @Get('status') + @ApiOperation({ summary: 'Check Teams MCP status' }) + @ApiResponse({ status: 200, description: 'MCP status retrieved' }) + async getStatus() { + const isAvailable = this.mcpTeamsService.isAvailable(); + + return { + available: isAvailable, + server: 'teams', + timestamp: new Date().toISOString(), + }; + } + + /** + * List all Teams + */ + @Get('teams') + @ApiOperation({ summary: 'List all Microsoft Teams' }) + @ApiResponse({ status: 200, description: 'Teams retrieved successfully' }) + async listTeams() { + try { + const teams = await this.mcpTeamsService.listTeams(); + + return { + success: true, + count: teams.length, + teams, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Failed to list teams', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * List channels in a team + */ + @Get('teams/:teamId/channels') + @ApiOperation({ summary: 'List channels in a Microsoft Team' }) + @ApiResponse({ status: 200, description: 'Channels retrieved successfully' }) + async listChannels(@Param('teamId') teamId: string) { + try { + const channels = await this.mcpTeamsService.listChannels(teamId); + + return { + success: true, + teamId, + count: channels.length, + channels, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: `Failed to list channels for team ${teamId}`, + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Get channel messages + */ + @Get('messages') + @ApiOperation({ summary: 'Get messages from a Teams channel' }) + @ApiResponse({ status: 200, description: 'Messages retrieved successfully' }) + async getMessages(@Query() query: GetMessagesDto) { + try { + const messages = await this.mcpTeamsService.getChannelMessages( + query.teamId, + query.channelId, + { + top: query.top, + skip: query.skip, + }, + ); + + return { + success: true, + teamId: query.teamId, + channelId: query.channelId, + count: messages.length, + messages, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Failed to get messages', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Sync channel messages to database + */ + @Post('sync') + @ApiOperation({ summary: 'Sync Teams channel messages to database' }) + @ApiResponse({ status: 200, description: 'Sync completed successfully' }) + async syncChannel(@Body() dto: SyncChannelDto) { + try { + const result = await this.mcpTeamsService.syncChannelHistory(dto.teamId, dto.channelId, { + top: dto.top, + filter: dto.filter, + }); + + return { + success: true, + result, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Sync failed', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Get team members + */ + @Get('teams/:teamId/members') + @ApiOperation({ summary: 'Get members of a Microsoft Team' }) + @ApiResponse({ status: 200, description: 'Members retrieved successfully' }) + async getTeamMembers(@Param() params: GetTeamMembersDto) { + try { + const members = await this.mcpTeamsService.getTeamMembers(params.teamId); + + return { + success: true, + teamId: params.teamId, + count: members.length, + members, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: `Failed to get members for team ${params.teamId}`, + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Get user info + */ + @Get('users/:userId') + @ApiOperation({ summary: 'Get Microsoft Teams user information' }) + @ApiResponse({ status: 200, description: 'User info retrieved' }) + async getUserInfo(@Param('userId') userId: string) { + try { + const user = await this.mcpTeamsService.getUserInfo(userId); + + return { + success: true, + user, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: `Failed to get user info for ${userId}`, + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Get online meetings + */ + @Get('meetings') + @ApiOperation({ summary: 'Get online meetings' }) + @ApiResponse({ status: 200, description: 'Meetings retrieved successfully' }) + async getMeetings(@Query() query: GetMeetingsDto) { + try { + const meetings = await this.mcpTeamsService.getOnlineMeetings({ + startDateTime: query.startDateTime, + endDateTime: query.endDateTime, + top: query.top, + }); + + return { + success: true, + count: meetings.length, + meetings, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Failed to get meetings', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Get meeting recordings + */ + @Get('recordings') + @ApiOperation({ summary: 'Get meeting recordings' }) + @ApiResponse({ status: 200, description: 'Recordings retrieved successfully' }) + async getRecordings(@Query('since') since?: string) { + try { + const sinceDate = since ? new Date(since) : undefined; + const recordings = await this.mcpTeamsService.getMeetingRecordings(sinceDate); + + return { + success: true, + count: recordings.length, + recordings, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Failed to get recordings', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Process a specific recording + */ + @Post('recordings/process') + @ApiOperation({ summary: 'Process a Teams meeting recording' }) + @ApiResponse({ status: 200, description: 'Recording processing queued' }) + async processRecording(@Body() dto: ProcessRecordingDto) { + try { + await this.mcpTeamsService.processRecording(dto.recordingId); + + return { + success: true, + message: `Recording ${dto.recordingId} queued for processing`, + recordingId: dto.recordingId, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Failed to process recording', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } + + /** + * Sync meeting recordings + */ + @Post('recordings/sync') + @ApiOperation({ summary: 'Sync meeting recordings' }) + @ApiResponse({ status: 200, description: 'Recording sync started' }) + async syncRecordings(@Body() dto: SyncRecordingsDto) { + try { + const sinceDate = dto.since ? new Date(dto.since) : undefined; + const recordings = await this.mcpTeamsService.getMeetingRecordings(sinceDate); + + // Process each recording (limit to dto.limit if specified) + const toProcess = dto.limit ? recordings.slice(0, dto.limit) : recordings; + + for (const recording of toProcess) { + try { + await this.mcpTeamsService.processRecording(recording.recordingId); + } catch (error) { + // Continue processing other recordings even if one fails + continue; + } + } + + return { + success: true, + message: 'Recording sync started', + totalFound: recordings.length, + queued: toProcess.length, + }; + } catch (error) { + throw new HttpException( + { + success: false, + message: 'Recording sync failed', + error: error instanceof Error ? error.message : 'Unknown error', + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } + } +} diff --git a/backend/src/modules/mcp-teams/dto/teams-mcp.dto.ts b/backend/src/modules/mcp-teams/dto/teams-mcp.dto.ts new file mode 100644 index 0000000..6179791 --- /dev/null +++ b/backend/src/modules/mcp-teams/dto/teams-mcp.dto.ts @@ -0,0 +1,171 @@ +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; +import { IsString, IsOptional, IsNumber, IsBoolean, Min, Max } from 'class-validator'; + +/** + * DTO for syncing Teams channel messages + */ +export class SyncChannelDto { + @ApiProperty({ + description: 'Microsoft Teams team ID', + example: '19:xxx@thread.tacv2', + }) + @IsString() + teamId: string; + + @ApiProperty({ + description: 'Microsoft Teams channel ID', + example: '19:yyy@thread.tacv2', + }) + @IsString() + channelId: string; + + @ApiPropertyOptional({ + description: 'Maximum number of messages to retrieve', + example: 100, + minimum: 1, + maximum: 1000, + }) + @IsOptional() + @IsNumber() + @Min(1) + @Max(1000) + top?: number; + + @ApiPropertyOptional({ + description: 'Filter query for messages', + example: "createdDateTime gt '2024-01-01'", + }) + @IsOptional() + @IsString() + filter?: string; +} + +/** + * DTO for getting channel messages + */ +export class GetMessagesDto { + @ApiProperty({ + description: 'Microsoft Teams team ID', + example: '19:xxx@thread.tacv2', + }) + @IsString() + teamId: string; + + @ApiProperty({ + description: 'Microsoft Teams channel ID', + example: '19:yyy@thread.tacv2', + }) + @IsString() + channelId: string; + + @ApiPropertyOptional({ + description: 'Number of messages to retrieve', + example: 50, + minimum: 1, + maximum: 1000, + }) + @IsOptional() + @IsNumber() + @Min(1) + @Max(1000) + top?: number; + + @ApiPropertyOptional({ + description: 'Number of messages to skip', + example: 0, + }) + @IsOptional() + @IsNumber() + @Min(0) + skip?: number; +} + +/** + * DTO for getting meetings + */ +export class GetMeetingsDto { + @ApiPropertyOptional({ + description: 'Start date time for meetings', + example: '2024-01-01T00:00:00Z', + }) + @IsOptional() + @IsString() + startDateTime?: string; + + @ApiPropertyOptional({ + description: 'End date time for meetings', + example: '2024-12-31T23:59:59Z', + }) + @IsOptional() + @IsString() + endDateTime?: string; + + @ApiPropertyOptional({ + description: 'Maximum number of meetings', + example: 50, + minimum: 1, + maximum: 100, + }) + @IsOptional() + @IsNumber() + @Min(1) + @Max(100) + top?: number; +} + +/** + * DTO for processing recording + */ +export class ProcessRecordingDto { + @ApiProperty({ + description: 'Recording ID', + example: 'recording-123', + }) + @IsString() + recordingId: string; + + @ApiPropertyOptional({ + description: 'Force reprocessing even if already processed', + example: false, + }) + @IsOptional() + @IsBoolean() + force?: boolean; +} + +/** + * DTO for bulk recording sync + */ +export class SyncRecordingsDto { + @ApiPropertyOptional({ + description: 'Sync recordings since this date', + example: '2024-01-01T00:00:00Z', + }) + @IsOptional() + @IsString() + since?: string; + + @ApiPropertyOptional({ + description: 'Maximum number of recordings to process', + example: 50, + minimum: 1, + maximum: 100, + }) + @IsOptional() + @IsNumber() + @Min(1) + @Max(100) + limit?: number; +} + +/** + * DTO for getting team members + */ +export class GetTeamMembersDto { + @ApiProperty({ + description: 'Microsoft Teams team ID', + example: '19:xxx@thread.tacv2', + }) + @IsString() + teamId: string; +} diff --git a/backend/src/modules/mcp-teams/index.ts b/backend/src/modules/mcp-teams/index.ts new file mode 100644 index 0000000..6a18ddd --- /dev/null +++ b/backend/src/modules/mcp-teams/index.ts @@ -0,0 +1,17 @@ +// Module +export * from './mcp-teams.module'; + +// Services +export * from './services/mcp-teams.service'; + +// Controllers +export * from './controllers/mcp-teams.controller'; + +// Processors +export * from './processors/teams-recording.processor'; + +// Interfaces +export * from './interfaces/teams-mcp.interface'; + +// DTOs +export * from './dto/teams-mcp.dto'; diff --git a/backend/src/modules/mcp-teams/interfaces/teams-mcp.interface.ts b/backend/src/modules/mcp-teams/interfaces/teams-mcp.interface.ts new file mode 100644 index 0000000..a0347e7 --- /dev/null +++ b/backend/src/modules/mcp-teams/interfaces/teams-mcp.interface.ts @@ -0,0 +1,246 @@ +/** + * Teams MCP Interfaces + */ + +/** + * Teams Team + */ +export interface TeamsTeam { + id: string; + displayName: string; + description?: string; + isArchived: boolean; + webUrl?: string; + createdDateTime: string; +} + +/** + * Teams Channel + */ +export interface TeamsChannel { + id: string; + displayName: string; + description?: string; + email?: string; + webUrl?: string; + membershipType?: string; + createdDateTime: string; +} + +/** + * Teams Message + */ +export interface TeamsMessage { + id: string; + messageType: string; + createdDateTime: string; + lastModifiedDateTime?: string; + deletedDateTime?: string; + subject?: string; + body: { + contentType: string; + content: string; + }; + from: { + user?: { + id: string; + displayName: string; + userIdentityType?: string; + }; + application?: { + id: string; + displayName: string; + }; + }; + attachments?: TeamsAttachment[]; + mentions?: TeamsMessageMention[]; + reactions?: TeamsMessageReaction[]; + replyToId?: string; + importance?: string; +} + +/** + * Teams Attachment + */ +export interface TeamsAttachment { + id: string; + contentType: string; + contentUrl?: string; + content?: string; + name?: string; + thumbnailUrl?: string; +} + +/** + * Teams Message Mention + */ +export interface TeamsMessageMention { + id: number; + mentionText: string; + mentioned: { + user?: { + id: string; + displayName: string; + }; + }; +} + +/** + * Teams Message Reaction + */ +export interface TeamsMessageReaction { + reactionType: string; + createdDateTime: string; + user: { + user?: { + id: string; + displayName: string; + }; + }; +} + +/** + * Teams User + */ +export interface TeamsUser { + id: string; + displayName: string; + mail?: string; + userPrincipalName?: string; + jobTitle?: string; + officeLocation?: string; + mobilePhone?: string; +} + +/** + * Teams Member + */ +export interface TeamsMember { + id: string; + displayName: string; + userId: string; + email?: string; + roles?: string[]; +} + +/** + * Teams Online Meeting + */ +export interface TeamsOnlineMeeting { + id: string; + subject: string; + startDateTime: string; + endDateTime: string; + joinUrl?: string; + joinWebUrl?: string; + participants?: { + organizer?: { + identity: { + user?: { + id: string; + displayName: string; + }; + }; + }; + attendees?: Array<{ + identity: { + user?: { + id: string; + displayName: string; + }; + }; + }>; + }; + recordingStatus?: string; + recording?: TeamsRecording; +} + +/** + * Teams Recording + */ +export interface TeamsRecording { + id: string; + meetingId?: string; + meetingOrganizerId?: string; + createdDateTime: string; + recordingContentUrl?: string; + recordingContentType?: string; +} + +/** + * Teams Call Record + */ +export interface TeamsCallRecord { + id: string; + type: string; + startDateTime: string; + endDateTime: string; + participants?: Array<{ + id: string; + displayName?: string; + }>; + organizer?: { + id: string; + displayName?: string; + }; +} + +/** + * Options for retrieving channel messages + */ +export interface GetChannelMessagesOptions { + top?: number; + skip?: number; + filter?: string; + orderBy?: string; +} + +/** + * Options for retrieving meetings + */ +export interface GetMeetingsOptions { + startDateTime?: string; + endDateTime?: string; + top?: number; +} + +/** + * Sync result + */ +export interface TeamsSyncResult { + teamId: string; + channelId: string; + teamName?: string; + channelName?: string; + messagesRetrieved: number; + messagesStored: number; + recordingsFound: number; + errors: string[]; + startedAt: Date; + completedAt: Date; + duration: number; +} + +/** + * Meeting recording metadata + */ +export interface TeamsMeetingRecording { + recordingId: string; + meetingId: string; + meetingTitle: string; + organizerId: string; + organizerName?: string; + startDateTime: Date; + endDateTime?: Date; + recordingUrl: string; + contentType: string; +} + +/** + * Workspace info + */ +export interface TeamsOrganization { + id: string; + displayName: string; + tenantId?: string; +} diff --git a/backend/src/modules/mcp-teams/mcp-teams.module.ts b/backend/src/modules/mcp-teams/mcp-teams.module.ts new file mode 100644 index 0000000..7df35fa --- /dev/null +++ b/backend/src/modules/mcp-teams/mcp-teams.module.ts @@ -0,0 +1,41 @@ +import { Module } from '@nestjs/common'; +import { BullModule } from '@nestjs/bull'; +import { McpTeamsService } from './services/mcp-teams.service'; +import { McpTeamsController } from './controllers/mcp-teams.controller'; +import { TeamsRecordingProcessor } from './processors/teams-recording.processor'; +import { MCPModule } from '../mcp/mcp.module'; +import { DatabaseModule } from '../../database/database.module'; +import { StorageModule } from '../../common/storage/storage.module'; +import { TranscriptionModule } from '../transcription/transcription.module'; + +/** + * Teams MCP Module + * Provides Microsoft Teams integration via Model Context Protocol + */ +@Module({ + imports: [ + // MCP Core + MCPModule, + + // Database + DatabaseModule, + + // Storage + StorageModule, + + // Transcription + TranscriptionModule, + + // Bull Queues + BullModule.registerQueue({ + name: 'teams-recording-processing', + }), + BullModule.registerQueue({ + name: 'nlp-processing', + }), + ], + controllers: [McpTeamsController], + providers: [McpTeamsService, TeamsRecordingProcessor], + exports: [McpTeamsService], +}) +export class McpTeamsModule {} diff --git a/backend/src/modules/mcp-teams/processors/teams-recording.processor.ts b/backend/src/modules/mcp-teams/processors/teams-recording.processor.ts new file mode 100644 index 0000000..1b4f0e1 --- /dev/null +++ b/backend/src/modules/mcp-teams/processors/teams-recording.processor.ts @@ -0,0 +1,156 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bull'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue } from 'bull'; +import { PrismaService } from '../../../database/prisma.service'; +import { TranscriptionService } from '../../transcription/transcription.service'; + +/** + * Teams Recording Processor + * Handles background processing of Teams meeting recordings + */ +@Processor('teams-recording-processing') +export class TeamsRecordingProcessor { + private readonly logger = new Logger(TeamsRecordingProcessor.name); + + constructor( + private readonly prisma: PrismaService, + private readonly transcriptionService: TranscriptionService, + @InjectQueue('nlp-processing') private nlpQueue: Queue, + ) {} + + /** + * Process recording transcription job + * This delegates to the main TranscriptionService for actual transcription + */ + @Process('transcribe') + async handleTranscription(job: Job<{ recordingId: string; s3Key: string }>) { + const { recordingId } = job.data; + + this.logger.log(`Processing transcription for recording ${recordingId}`); + + try { + // Update job progress + await job.progress(10); + + // Use the existing TranscriptionService to process transcription + await this.transcriptionService.processTranscription(recordingId); + + await job.progress(90); + + // Get the created transcription + const transcription = await this.prisma.transcription.findFirst({ + where: { audioRecordingId: recordingId }, + }); + + if (transcription) { + // Queue NLP processing for the transcription + await this.nlpQueue.add('process-transcription', { + transcriptionId: transcription.id, + audioRecordingId: recordingId, + }); + + this.logger.log(`Queued NLP processing for transcription ${transcription.id}`); + } + + await job.progress(100); + + return { + success: true, + transcriptionId: transcription?.id, + recordingId, + }; + } catch (error) { + this.logger.error(`Transcription failed for ${recordingId}: ${error}`); + + // Update recording with error + try { + await this.prisma.audioRecording.update({ + where: { id: recordingId }, + data: { + mcpMetadata: { + transcriptionError: error instanceof Error ? error.message : 'Unknown error', + transcriptionFailedAt: new Date(), + }, + }, + }); + } catch (updateError) { + this.logger.error(`Failed to update recording error: ${updateError}`); + } + + throw error; + } + } + + /** + * Process bulk recording sync + */ + @Process('bulk-sync') + async handleBulkSync(job: Job<{ recordingIds: string[]; since?: Date }>) { + const { recordingIds } = job.data; + + this.logger.log(`Processing bulk recording sync: ${recordingIds.length} recordings`); + + const results = { + total: recordingIds.length, + processed: 0, + failed: 0, + skipped: 0, + errors: [] as string[], + }; + + for (let i = 0; i < recordingIds.length; i++) { + const recordingId = recordingIds[i]; + + try { + await job.progress(Math.floor((i / recordingIds.length) * 100)); + + // Check if already processed + const existing = await this.prisma.audioRecording.findFirst({ + where: { + source: 'teams-mcp', + sourceId: recordingId, + }, + }); + + if (existing) { + this.logger.debug(`Recording ${recordingId} already processed, skipping`); + results.skipped++; + continue; + } + + // Queue individual transcription job + // Note: The actual recording processing is handled by McpTeamsService.processRecording + // This job just coordinates the bulk operation + + results.processed++; + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + this.logger.error(`Failed to process ${recordingId}: ${errorMsg}`); + results.failed++; + results.errors.push(`${recordingId}: ${errorMsg}`); + } + + // Rate limiting: wait 2 seconds between recordings + if (i < recordingIds.length - 1) { + await this.delay(2000); + } + } + + await job.progress(100); + + this.logger.log( + `Bulk sync completed: ${results.processed} processed, ${results.failed} failed, ${results.skipped} skipped`, + ); + + return results; + } + + /** + * Helper: Delay execution + */ + private delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/backend/src/modules/mcp-teams/services/mcp-teams.service.ts b/backend/src/modules/mcp-teams/services/mcp-teams.service.ts new file mode 100644 index 0000000..5a20d9a --- /dev/null +++ b/backend/src/modules/mcp-teams/services/mcp-teams.service.ts @@ -0,0 +1,527 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bull'; +import { Queue } from 'bull'; +import { PrismaService } from '../../../database/prisma.service'; +import { MCPToolInvokerService } from '../../mcp/services/mcp-tool-invoker.service'; +import { MCPConnectionManagerService } from '../../mcp/services/mcp-connection-manager.service'; +import { StorageService } from '../../../common/storage/storage.service'; +import { + TeamsTeam, + TeamsChannel, + TeamsMessage, + TeamsUser, + TeamsMember, + TeamsOnlineMeeting, + TeamsMeetingRecording, + GetChannelMessagesOptions, + GetMeetingsOptions, + TeamsSyncResult, +} from '../interfaces/teams-mcp.interface'; + +/** + * Teams MCP Service + * Handles Microsoft Teams data retrieval via MCP server + */ +@Injectable() +export class McpTeamsService implements OnModuleInit { + private readonly logger = new Logger(McpTeamsService.name); + private readonly SERVER_NAME = 'teams'; + + constructor( + private readonly mcpToolInvoker: MCPToolInvokerService, + private readonly mcpConnectionManager: MCPConnectionManagerService, + private readonly prisma: PrismaService, + private readonly storage: StorageService, + @InjectQueue('nlp-processing') private nlpQueue: Queue, + @InjectQueue('teams-recording-processing') private recordingQueue: Queue, + ) {} + + async onModuleInit() { + this.logger.log('Teams MCP Service initialized'); + } + + /** + * Check if Teams MCP is available + */ + isAvailable(): boolean { + return this.mcpConnectionManager.isServerConnected(this.SERVER_NAME); + } + + /** + * List all teams + */ + async listTeams(): Promise { + this.logger.debug('Listing Teams via MCP'); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_list_teams', + parameters: {}, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to list teams'); + } + + return this.parseTeams(result.data); + } catch (error) { + this.logger.error(`Failed to list teams: ${error}`); + throw error; + } + } + + /** + * List channels in a team + */ + async listChannels(teamId: string): Promise { + this.logger.debug(`Listing channels for team ${teamId}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_list_channels', + parameters: { + teamId, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to list channels'); + } + + return this.parseChannels(result.data); + } catch (error) { + this.logger.error(`Failed to list channels for ${teamId}: ${error}`); + throw error; + } + } + + /** + * Get channel messages + */ + async getChannelMessages( + teamId: string, + channelId: string, + options?: GetChannelMessagesOptions, + ): Promise { + this.logger.debug(`Getting messages for team ${teamId}, channel ${channelId}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_get_channel_messages', + parameters: { + teamId, + channelId, + top: options?.top || 50, + skip: options?.skip, + filter: options?.filter, + orderBy: options?.orderBy, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to get channel messages'); + } + + return this.parseMessages(result.data); + } catch (error) { + this.logger.error(`Failed to get messages for ${teamId}/${channelId}: ${error}`); + throw error; + } + } + + /** + * Get message replies + */ + async getMessageReplies( + teamId: string, + channelId: string, + messageId: string, + ): Promise { + this.logger.debug(`Getting replies for message ${messageId}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_get_message_replies', + parameters: { + teamId, + channelId, + messageId, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to get message replies'); + } + + return this.parseMessages(result.data); + } catch (error) { + this.logger.error(`Failed to get replies for message ${messageId}: ${error}`); + throw error; + } + } + + /** + * Get team members + */ + async getTeamMembers(teamId: string): Promise { + this.logger.debug(`Getting members for team ${teamId}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_get_members', + parameters: { + teamId, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to get team members'); + } + + return this.parseMembers(result.data); + } catch (error) { + this.logger.error(`Failed to get members for ${teamId}: ${error}`); + throw error; + } + } + + /** + * Get user info + */ + async getUserInfo(userId: string): Promise { + this.logger.debug(`Getting user info for ${userId}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_get_user', + parameters: { + userId, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to get user info'); + } + + return this.parseUser(result.data); + } catch (error) { + this.logger.error(`Failed to get user info for ${userId}: ${error}`); + throw error; + } + } + + /** + * Get online meetings + */ + async getOnlineMeetings(options?: GetMeetingsOptions): Promise { + this.logger.debug('Getting online meetings'); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_list_meetings', + parameters: { + startDateTime: options?.startDateTime, + endDateTime: options?.endDateTime, + top: options?.top || 50, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to get meetings'); + } + + return this.parseMeetings(result.data); + } catch (error) { + this.logger.error(`Failed to get meetings: ${error}`); + throw error; + } + } + + /** + * Get meeting recordings + */ + async getMeetingRecordings(since?: Date): Promise { + this.logger.debug('Getting meeting recordings'); + + try { + const meetings = await this.getOnlineMeetings({ + startDateTime: since?.toISOString(), + top: 100, + }); + + const recordings: TeamsMeetingRecording[] = []; + + for (const meeting of meetings) { + if (meeting.recordingStatus === 'available' && meeting.recording) { + const recording = meeting.recording; + recordings.push({ + recordingId: recording.id, + meetingId: meeting.id, + meetingTitle: meeting.subject, + organizerId: meeting.participants?.organizer?.identity?.user?.id || 'unknown', + organizerName: meeting.participants?.organizer?.identity?.user?.displayName, + startDateTime: new Date(meeting.startDateTime), + endDateTime: new Date(meeting.endDateTime), + recordingUrl: recording.recordingContentUrl || '', + contentType: recording.recordingContentType || 'video/mp4', + }); + } + } + + this.logger.log(`Found ${recordings.length} meeting recordings`); + return recordings; + } catch (error) { + this.logger.error(`Failed to get meeting recordings: ${error}`); + throw error; + } + } + + /** + * Download recording + */ + async downloadRecording(recordingUrl: string): Promise { + this.logger.debug(`Downloading recording from ${recordingUrl}`); + + try { + const result = await this.mcpToolInvoker.invokeTool({ + serverName: this.SERVER_NAME, + toolName: 'teams_download_recording', + parameters: { + url: recordingUrl, + }, + }); + + if (!result.success || !result.data) { + throw new Error(result.error || 'Failed to download recording'); + } + + // Convert base64 or buffer to Buffer + if (typeof result.data === 'string') { + return Buffer.from(result.data, 'base64'); + } else if (Buffer.isBuffer(result.data)) { + return result.data; + } else if (result.data.content) { + return Buffer.from(result.data.content, 'base64'); + } + + throw new Error('Invalid recording data format'); + } catch (error) { + this.logger.error(`Failed to download recording: ${error}`); + throw error; + } + } + + /** + * Store messages from MCP to database + */ + async storeMessagesFromMCP( + messages: TeamsMessage[], + teamId: string, + channelId: string, + ): Promise { + this.logger.debug( + `Storing ${messages.length} messages from team ${teamId}, channel ${channelId}`, + ); + + let storedCount = 0; + + for (const message of messages) { + try { + const timestamp = new Date(message.createdDateTime); + const userId = message.from?.user?.id || 'unknown'; + const userName = message.from?.user?.displayName || 'unknown'; + + const stored = await this.prisma.message.upsert({ + where: { + source_sourceId: { + source: 'teams-mcp', + sourceId: message.id, + }, + }, + update: { + content: message.body.content, + timestamp, + mcpRetrieved: true, + mcpMetadata: message as any, + }, + create: { + source: 'teams-mcp', + sourceId: message.id, + channelId, + authorId: userId, + authorName: userName, + content: message.body.content, + messageType: message.messageType || 'message', + timestamp, + metadata: message as any, + mcpRetrieved: true, + mcpMetadata: message as any, + }, + }); + + // Queue NLP processing + await this.nlpQueue.add('process-message', { + messageId: stored.id, + }); + + storedCount++; + } catch (error) { + this.logger.error(`Failed to store message ${message.id}: ${error}`); + } + } + + this.logger.log(`Stored ${storedCount}/${messages.length} messages`); + return storedCount; + } + + /** + * Sync channel history + */ + async syncChannelHistory( + teamId: string, + channelId: string, + options?: GetChannelMessagesOptions, + ): Promise { + const startTime = new Date(); + this.logger.log(`Starting sync for team ${teamId}, channel ${channelId}`); + + const result: TeamsSyncResult = { + teamId, + channelId, + messagesRetrieved: 0, + messagesStored: 0, + recordingsFound: 0, + errors: [], + startedAt: startTime, + completedAt: new Date(), + duration: 0, + }; + + try { + // Get messages + const messages = await this.getChannelMessages(teamId, channelId, options); + result.messagesRetrieved = messages.length; + + // Store messages + result.messagesStored = await this.storeMessagesFromMCP(messages, teamId, channelId); + + this.logger.log( + `Sync completed for ${teamId}/${channelId}: ${result.messagesStored} messages stored`, + ); + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + result.errors.push(errorMsg); + this.logger.error(`Sync failed for ${teamId}/${channelId}: ${errorMsg}`); + } + + result.completedAt = new Date(); + result.duration = result.completedAt.getTime() - result.startedAt.getTime(); + + return result; + } + + /** + * Process meeting recording + */ + async processRecording(recordingId: string): Promise { + this.logger.log(`Processing recording ${recordingId}`); + + try { + // Check if already processed + const existing = await this.prisma.audioRecording.findFirst({ + where: { + source: 'teams-mcp', + sourceId: recordingId, + }, + }); + + if (existing) { + this.logger.log(`Recording ${recordingId} already processed`); + return; + } + + // Get all recordings and find this one + const recordings = await this.getMeetingRecordings(); + const recording = recordings.find((r) => r.recordingId === recordingId); + + if (!recording) { + throw new Error(`Recording ${recordingId} not found`); + } + + // Download recording + const recordingBuffer = await this.downloadRecording(recording.recordingUrl); + + // Upload to S3 + const s3Key = this.storage.generateRecordingKey('teams', recordingId, 'mp4'); + await this.storage.uploadFile(s3Key, recordingBuffer, recording.contentType); + + // Create database record + const audioRecording = await this.prisma.audioRecording.create({ + data: { + source: 'teams-mcp', + sourceId: recordingId, + meetingTitle: recording.meetingTitle, + s3Key, + timestamp: recording.startDateTime, + mcpRetrieved: true, + mcpMetadata: recording as any, + }, + }); + + // Queue transcription + await this.recordingQueue.add('transcribe', { + recordingId: audioRecording.id, + s3Key, + }); + + this.logger.log(`Queued transcription for recording ${recordingId}`); + } catch (error) { + this.logger.error(`Failed to process recording ${recordingId}: ${error}`); + throw error; + } + } + + /** + * Parser helpers + */ + private parseTeams(data: any): TeamsTeam[] { + if (data.value) return data.value; + if (Array.isArray(data)) return data; + return []; + } + + private parseChannels(data: any): TeamsChannel[] { + if (data.value) return data.value; + if (Array.isArray(data)) return data; + return []; + } + + private parseMessages(data: any): TeamsMessage[] { + if (data.value) return data.value; + if (Array.isArray(data)) return data; + return []; + } + + private parseMembers(data: any): TeamsMember[] { + if (data.value) return data.value; + if (Array.isArray(data)) return data; + return []; + } + + private parseUser(data: any): TeamsUser { + return data; + } + + private parseMeetings(data: any): TeamsOnlineMeeting[] { + if (data.value) return data.value; + if (Array.isArray(data)) return data; + return []; + } +}