diff --git a/Sources/App/Controllers/AuthController.swift b/Sources/App/Controllers/AuthController.swift deleted file mode 100644 index 8b13789..0000000 --- a/Sources/App/Controllers/AuthController.swift +++ /dev/null @@ -1 +0,0 @@ - diff --git a/Sources/App/Controllers/ChatController.swift b/Sources/App/Controllers/ChatController.swift index d3c39b5..65aaab7 100644 --- a/Sources/App/Controllers/ChatController.swift +++ b/Sources/App/Controllers/ChatController.swift @@ -11,17 +11,18 @@ import MongoKitten import JWT import AddaSharedModels -extension ChatController: RouteCollection { +extension WebsocketController: RouteCollection { func boot(routes: RoutesBuilder) throws { routes.webSocket(onUpgrade: self.webSocket) } } -struct ChatController { - let wsController: WebSocketController - - func webSocket(_ req: Request, socket: WebSocket) { - self.wsController.connect(socket, req: req) +struct WebsocketController { + let wsController: WebsocketHandle + + func webSocket(_ req: Request, ws: WebSocket) { + Task { + await self.wsController.connectionHandler(ws: ws, req: req) + } } - } diff --git a/Sources/App/Controllers/WebSocketController.swift b/Sources/App/Controllers/WebSocketController.swift index 36ab723..eb0b70e 100644 --- a/Sources/App/Controllers/WebSocketController.swift +++ b/Sources/App/Controllers/WebSocketController.swift @@ -11,77 +11,77 @@ import MongoKitten import AddaSharedModels import NIOConcurrencyHelpers -class WebSocketController { - let lock: NIOLock - let db: Database - let logger: Logger - var chatClients: WebsocketClients - - init(eventLoop: EventLoop, db: Database) { - self.lock = NIOLock() - self.db = db - self.logger = Logger(label: "WebSocketController") - self.chatClients = WebsocketClients(eventLoop: eventLoop) - } - - func connect(_ ws: WebSocket, req: Request) { - - ws.onPong { ws in - ws.onText { (ws, text) in - print(#line, text) - } - } - - ws.onPong { ws in - ws.onText { (ws, text) in - print(#line, text) - } - } - - ws.onText { [self] ws, text in - guard let data = text.data(using: .utf8) else { - logger.error( "Wrong encoding for received message") - return - } - - let string = String(data: data, encoding: .utf8) - print(#line, string as Any) - - let chatOutGoingEvent = ChatOutGoingEvent.decode(data: data) - - switch chatOutGoingEvent { - - // from client to server - case .connect(let user): - let userID = user.id - let client = ChatClient(id: userID, socket: ws) - chatClients.add(client) - - // from client to server - case .disconnect(let user): - let userID = user.id - let client = ChatClient(id: userID, socket: ws) - chatClients.remove(client) - - // from client to server & server to client - case .message(let msg): - print(#line, msg) - chatClients.send(msg, req: req) - - // from server to client - case .conversation(let lastMessage): - print(#line, lastMessage) - chatClients.send(lastMessage, req: req) - - case .notice(let msg): - print(#line, msg) - - case .error(let error): - print(#line, error) - logger.error("(error)") - case .none: - print(#line, "decode error") - } - } - } -} +//class WebSocketController { +// let lock: NIOLock +// let db: Database +// let logger: Logger +// var chatClients: WebsocketClients +// +// init(eventLoop: EventLoop, db: Database) { +// self.lock = NIOLock() +// self.db = db +// self.logger = Logger(label: "WebSocketController") +// self.chatClients = WebsocketClients(eventLoop: eventLoop) +// } +// +// func connect(_ ws: WebSocket, req: Request) { +// +// ws.onPong { ws in +// ws.onText { (ws, text) in +// print(#line, text) +// } +// } +// +// ws.onPong { ws in +// ws.onText { (ws, text) in +// print(#line, text) +// } +// } +// +// ws.onText { [self] ws, text in +// guard let data = text.data(using: .utf8) else { +// logger.error( "Wrong encoding for received message") +// return +// } +// +// let string = String(data: data, encoding: .utf8) +// print(#line, string as Any) +// +// let chatOutGoingEvent = ChatOutGoingEvent.decode(data: data) +// +// switch chatOutGoingEvent { +// +// // from client to server +// case .connect(let user): +// let userID = user.id +// let client = ChatClient(id: userID, socket: ws) +// chatClients.add(client) +// +// // from client to server +// case .disconnect(let user): +// let userID = user.id +// let client = ChatClient(id: userID, socket: ws) +// chatClients.remove(client) +// +// // from client to server & server to client +// case .message(let msg): +// print(#line, msg) +// chatClients.send(msg, req: req) +// +// // from server to client +// case .conversation(let lastMessage): +// print(#line, lastMessage) +// chatClients.send(lastMessage, req: req) +// +// case .notice(let msg): +// print(#line, msg) +// +// case .error(let error): +// print(#line, error) +// logger.error("(error)") +// case .none: +// print(#line, "decode error") +// } +// } +// } +//} diff --git a/Sources/App/RouteHandlers/AuthEngineHandlers/AuthenticationHandler.swift b/Sources/App/RouteHandlers/AuthEngineHandlers/AuthenticationHandler.swift index 39ff4ea..347d82a 100644 --- a/Sources/App/RouteHandlers/AuthEngineHandlers/AuthenticationHandler.swift +++ b/Sources/App/RouteHandlers/AuthEngineHandlers/AuthenticationHandler.swift @@ -178,6 +178,7 @@ private func emailVerificationResponseForValidUser( let refreshToken = try req.application.jwt.signers.sign(refreshPayload) let access = RefreshTokenResponse(accessToken: accessToken, refreshToken: refreshToken) + req.payload = userPayload try await VerificationCodeAttempt.query(on: req.db) .filter(\.$id == input.attemptId) diff --git a/Sources/App/RouteHandlers/AuthEngineHandlers/DeviceHandler.swift b/Sources/App/RouteHandlers/AuthEngineHandlers/DeviceHandler.swift index 20eaba7..f579f39 100644 --- a/Sources/App/RouteHandlers/AuthEngineHandlers/DeviceHandler.swift +++ b/Sources/App/RouteHandlers/AuthEngineHandlers/DeviceHandler.swift @@ -26,13 +26,22 @@ public func devicesHandler( token: input.token, voipToken: input.voipToken ) + + /// we need it becz we add url for not login users + if let token = request.accessToken { + do { + request.payload = try request.jwt.verify(Array(token.utf8), as: Payload.self) + } + } if request.loggedIn { currentUserID = request.payload.user.id newInput.ownerId = request.payload.user.id } + newInput.ownerId = currentUserID let data = DeviceModel( + identifierForVendor: newInput.identifierForVendor, name: newInput.name, model: newInput.model, osVersion: newInput.osVersion, @@ -40,17 +49,17 @@ public func devicesHandler( voipToken: newInput.voipToken, userId: currentUserID ) - - let device = try await DeviceModel.query(on: request.db) + + guard let device = try await DeviceModel.query(on: request.db) .filter(\.$token == input.token) .first() .get() - - guard let device = device else { + + else { try await data.save(on: request.db).get() return data.res } - + try await device.update(newInput) try await device.update(on: request.db) return device.res diff --git a/Sources/App/RouteHandlers/AuthEngineHandlers/UserHandler.swift b/Sources/App/RouteHandlers/AuthEngineHandlers/UserHandler.swift index 57933b0..1b6f4ed 100644 --- a/Sources/App/RouteHandlers/AuthEngineHandlers/UserHandler.swift +++ b/Sources/App/RouteHandlers/AuthEngineHandlers/UserHandler.swift @@ -289,16 +289,20 @@ public func userHandler( .get() return user.response + case let .devices(devicesRoute): return try await devicesHandler(request: request, route: devicesRoute) + case let .attachments(attachmentsRoute): return try await attachmentsHandler(request: request, route: attachmentsRoute) + case let .conversations(conversationsRoute): return try await conversationsHandler( request: request, usersId: usersId, route: conversationsRoute ) + case .events(_): return Response(status: .badRequest) } diff --git a/Sources/App/RouteHandlers/AuthEngineHandlers/UsersHandler.swift b/Sources/App/RouteHandlers/AuthEngineHandlers/UsersHandler.swift index 8437e30..1e70c8f 100644 --- a/Sources/App/RouteHandlers/AuthEngineHandlers/UsersHandler.swift +++ b/Sources/App/RouteHandlers/AuthEngineHandlers/UsersHandler.swift @@ -13,6 +13,7 @@ public func usersHandler( switch route { case .user(id: let id, route: let userRoute): return try await userHandler(request: request, usersId: id, route: userRoute) + case .update(input: let input): if request.loggedIn == false { throw Abort(.unauthorized) } guard let currentUserID = request.payload.user.id else { diff --git a/Sources/App/Webbsockets/ChatClient.swift b/Sources/App/Webbsockets/ChatClient.swift index 7a8dcca..a404f19 100644 --- a/Sources/App/Webbsockets/ChatClient.swift +++ b/Sources/App/Webbsockets/ChatClient.swift @@ -11,51 +11,115 @@ import Fluent import AddaSharedModels import APNS -final class ChatClient: WebSocketClient, Hashable { +actor WebsocketClients { + private var allSockets: [ObjectId: WebSocket] = [:] + private let logger = Logger(label: "WebsocketClients") - let logger: Logger = Logger(label: "ChatClient") - - override init(id: ObjectId, socket: WebSocket) { - super.init(id: id, socket: socket) + func activeSockets(senderId: ObjectId) -> [WebSocket] { + let allExceptSender = allSockets.filter { $0.key != senderId } + return allExceptSender.values.filter { !$0.isClosed } } - func send(_ event: ChatOutGoingEvent) { - guard let text = event.jsonString else { - logger.error("Error occer when convert ChatOutGoingEvent to jsonString") - return + func join(id: ObjectId, on ws: WebSocket) { + self.allSockets[id] = ws + } + + func find(id: ObjectId) -> WebSocket? { + return self.allSockets[id] + } + + func leave(id: ObjectId) { + self.allSockets.removeValue(forKey: id) + } + + func send(msg: MessageItem, req: Request) async throws { + guard let senderID = req.payload.user.id else { + throw Abort(.notFound, reason: "current User missing from payload") + } + + let messageCreate = MessageModel(msg, senderId: senderID, receipientId: nil) + + do { + try await messageCreate.save(on: req.db) + + for socket in activeSockets(senderId: senderID) { + try await send(message: messageCreate, req: req, socket: socket) + } + + try await sendNotificationToConversationMembers( + msgItem: msg, + senderID: senderID, + with: req + ) + + } catch { + messageCreate.isDelivered = false } - socket.send(text) } - - func send(_ message: MessageModel, _ req: Request) { - guard req.loggedIn != false else { + + @Sendable func send(message: MessageModel, req: Request, socket: WebSocket) async throws { + if !req.loggedIn { logger.error("\(#line) Unauthorized send message") - return + throw Abort(.unauthorized) } - MessageModel.query(on: req.db) + try await MessageModel.query(on: req.db) .with(\.$sender) { $0.with(\.$attachments) } .with(\.$recipient) { $0.with(\.$attachments) } .filter(\.$id == message.id!) .first() - .unwrap(or: Abort(.notFound, reason: "No Message found! by id: \(id)")) + .unwrap(or: Abort(.notFound, reason: "No Message found! by id: \(message.id?.hexString ?? "")")) .map { original in let message = ChatOutGoingEvent.message(original.response).jsonString let lastMessage = ChatOutGoingEvent.conversation(original.response).jsonString - self.socket.send(message ?? "") - self.socket.send(lastMessage ?? "") - + socket.send(message ?? "") + socket.send(lastMessage ?? "") } - + .get() } - static func == (lhs: ChatClient, rhs: ChatClient) -> Bool { - return lhs.id == rhs.id - } + @Sendable private func sendNotificationToConversationMembers( + msgItem: MessageItem, + senderID: ObjectId, + with req: Request + ) async throws { + + guard let conversation = try await ConversationModel.query(on: req.db) + .with(\.$members) + .filter(\.$id == msgItem.conversationId) + .first() + .get() + + else { + throw Abort(.notFound, reason: "No Conversation found! by ID \(msgItem.conversationId.hexString)") + } + + for member in conversation.members where member.id != senderID { + + guard let memberID = member.id else { + throw Abort(.notFound, reason: "current User missing from payload") + } + + guard let device = try await DeviceModel.query(on: req.db) + .filter(\.$user.$id == memberID) + .first() + .get() + + else { + throw Abort(.notFound, reason: "User not found from \(#function)") + } + + try await req.apns.send( + .init( + title: conversation.title, + subtitle: msgItem.messageBody + ), + to: device.token + ).get() + + } + } - func hash(into hasher: inout Hasher) { - hasher.combine(id) - } } diff --git a/Sources/App/Webbsockets/ChatHandle.swift b/Sources/App/Webbsockets/ChatHandle.swift index daf2cd6..23d00fe 100644 --- a/Sources/App/Webbsockets/ChatHandle.swift +++ b/Sources/App/Webbsockets/ChatHandle.swift @@ -10,13 +10,14 @@ import Foundation import MongoKitten import AddaSharedModels -class ChatHandle { - var chatClients: WebsocketClients - - init(eventLoop: EventLoop) { - self.chatClients = WebsocketClients(eventLoop: eventLoop) +actor WebsocketHandle { + + private var wsClients: WebsocketClients + + init(wsClients: WebsocketClients) { + self.wsClients = wsClients } - + func connectionHandler(ws: WebSocket, req: Request) { ws.onPong { ws in @@ -25,11 +26,11 @@ class ChatHandle { } } - ws.onPong { ws in - ws.onText { (ws, text) in - print(#line, text) - } - } +// ws.onPing { ws in +// ws.onText { (ws, text) in +// print(#line, text) +// } +// } ws.onText { [self] ws, text in guard let data = text.data(using: .utf8) else { @@ -41,36 +42,45 @@ class ChatHandle { print(#line, string as Any) guard let chatOutGoingEvent = ChatOutGoingEvent.decode(data: data) else { - ws.close(code: .unacceptableData) + req.logger.notice("unacceptableData for connect web socket") + Task { + try await ws.close(code: .unacceptableData) + } return } switch chatOutGoingEvent { case .connect(let user): - let userID = user.id - - let client = ChatClient(id: userID, socket: ws) - chatClients.add(client) - req.logger.info("web socker connect for user \(user.email ?? user.fullName)") + let userID = user.id + Task { + await wsClients.join(id: userID, on: ws) + } + req.logger.info("web socker connect for user \(user.email ?? user.fullName ?? "")") case .disconnect(let user): let userID = user.id - let client = ChatClient(id: userID, socket: ws) - chatClients.remove(client) - req.logger.info("web socker remove for user \(user.email ?? user.fullName)") + Task { + await wsClients.leave(id: userID) + } + req.logger.info("web socker remove for user \(user.email ?? user.fullName ?? "")") case .message(let msg): - chatClients.send(msg, req: req) - + Task { + try await wsClients.send(msg: msg, req: req) + } + case .conversation(let lastMessage): - print(#line, lastMessage) - chatClients.send(lastMessage, req: req) - + + Task { + try await wsClients.send(msg: lastMessage, req: req) + } + + req.logger.info("conversation conversation: \(lastMessage)") case .notice(let msg): print(#line, msg) case .error(let error): - print(#line, error) + req.logger.info("error: \(error.localizedDescription)") } } } diff --git a/Sources/App/Webbsockets/WebSocketClient.swift b/Sources/App/Webbsockets/WebSocketClient.swift deleted file mode 100644 index 70dd950..0000000 --- a/Sources/App/Webbsockets/WebSocketClient.swift +++ /dev/null @@ -1,19 +0,0 @@ -// -// WebSocketClient.swift -// -// -// Created by Alif on 19/6/20. -// - -import Vapor -import MongoKitten - -open class WebSocketClient { - open var id: ObjectId - open var socket: WebSocket - - public init(id: ObjectId, socket: WebSocket) { - self.id = id - self.socket = socket - } -} diff --git a/Sources/App/Webbsockets/WebSocketClients.swift b/Sources/App/Webbsockets/WebSocketClients.swift deleted file mode 100644 index 1eb9bbf..0000000 --- a/Sources/App/Webbsockets/WebSocketClients.swift +++ /dev/null @@ -1,113 +0,0 @@ -// -// WebsocketClients.swift -// -// -// Created by Alif on 19/6/20. -// - -import Vapor -import MongoKitten -import AddaSharedModels -import NIOConcurrencyHelpers - -final class WebsocketClients { - - let lock: NIOLock - var eventLoop: EventLoop - var allCliendts: [ObjectId: WebSocketClient] - let logger: Logger - - var activeClients: [WebSocketClient] { - self.lock.withLock { - self.allCliendts.values.filter { !$0.socket.isClosed } - } - } - - init(eventLoop: EventLoop, clients: [ObjectId: WebSocketClient] = [:]) { - self.eventLoop = eventLoop - self.allCliendts = clients - self.logger = Logger(label: "WebsocketClients") - self.lock = NIOLock() - } - - func add(_ client: WebSocketClient) { - self.lock.withLock { - self.allCliendts[client.id] = client - } - } - - func remove(_ client: WebSocketClient) { - self.lock.withLock { - self.allCliendts[client.id] = nil - } - } - - func find(_ objectId: ObjectId) -> WebSocketClient? { - self.lock.withLock { - return self.allCliendts[objectId] - } - } - - fileprivate func sendNotificationToConversationMembers(_ msg: MessageModel, _ req: Request) -> EventLoopFuture<()> { - return msg.$conversation.query(on: req.db) - .with(\.$members) { - $0.with(\.$devices) { - $0.with(\.$user) { $0.with(\.$attachments) } - } - } - .first() - .unwrap(or: Abort(.noContent) ) - .map { conversation in - for user in conversation.members where user.id != req.payload.user.id { - for device in user.devices { - req.apns.send( - .init(title: conversation.title, subtitle: msg.messageBody), - to: device.token - ) - } - } - } - } - - func send(_ msg: MessageItem, req: Request) { - - let messageCreate = MessageModel(msg, senderId: req.payload.user.id, receipientId: nil) - - req.db.withConnection { _ in - messageCreate.save(on: req.db) - }.whenComplete { [self] res in - - let success: Bool - - switch res { - case .failure(let err): - self.logger.report(error: err) - success = false - - case .success: - self.logger.info("success true") - success = true - } - - messageCreate.isDelivered = success - _ = messageCreate.update(on: req.db) - - let chatClients = self.activeClients.compactMap { $0 as? ChatClient } - - for client in chatClients where client.id != msg.sender!.id { - client.send(messageCreate, req) - } - - _ = sendNotificationToConversationMembers(messageCreate, req) - } - - } - - deinit { - let futures = self.allCliendts.values.map { $0.socket.close() } - try! self.eventLoop.flatten(futures).wait() - logger.debug("deinit call from WebsocketClients") - } - -} - diff --git a/Sources/App/routes.swift b/Sources/App/routes.swift index fedd449..82280c3 100644 --- a/Sources/App/routes.swift +++ b/Sources/App/routes.swift @@ -9,7 +9,7 @@ func routes(_ app: Application) throws { try app.group("v1") { api in let chat = api.grouped("chat") - let webSocketController = WebSocketController(eventLoop: app.eventLoopGroup.next(), db: app.db) - try chat.register(collection: ChatController(wsController: webSocketController) ) + let webSocketController = WebsocketHandle.init(wsClients: .init()) + try chat.register(collection: WebsocketController(wsController: webSocketController) ) } }