Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#296 Exchange messages between clients #309

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 46 additions & 13 deletions Example/IntegrationTests/Chat/ChatTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ final class ChatTests: XCTestCase {
}

func testInvite() async {
// await waitClientsConnected()
// let inviteExpectation = expectation(description: "invitation expectation")
// let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
// let pubKey = try! await invitee.register(account: account)
// try! await inviter.invite(publicKey: pubKey, openingMessage: "")
// invitee.invitePublisher.sink { _ in
// inviteExpectation.fulfill()
// }.store(in: &publishers)
// wait(for: [inviteExpectation], timeout: 4)
await waitClientsConnected()
let inviteExpectation = expectation(description: "invitation expectation")
let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
let pubKey = try! await invitee.register(account: account)
try! await inviter.invite(publicKey: pubKey, openingMessage: "")
invitee.invitePublisher.sink { _ in
inviteExpectation.fulfill()
}.store(in: &publishers)
wait(for: [inviteExpectation], timeout: 4)
}

func testAcceptAndCreateNewThread() async {
//
// func testAcceptAndCreateNewThread() async {
// await waitClientsConnected()
// let newThreadInviterExpectation = expectation(description: "new thread on inviting client expectation")
// let newThreadinviteeExpectation = expectation(description: "new thread on invitee client expectation")
Expand All @@ -78,6 +78,39 @@ final class ChatTests: XCTestCase {
// newThreadInviterExpectation.fulfill()
// }.store(in: &publishers)
//
// wait(for: [newThreadinviteeExpectation, newThreadInviterExpectation], timeout: 4)
}
// wait(for: [newThreadinviteeExpectation, newThreadInviterExpectation], timeout: 30)
// }
//
// func testMessage() async {
// await waitClientsConnected()
// let messageExpectation = expectation(description: "message received")
// messageExpectation.expectedFulfillmentCount = 2
// let message = "message"
//
// let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
// let pubKey = try! await invitee.register(account: account)
// try! await inviter.invite(publicKey: pubKey, openingMessage: "opening message")
//
// invitee.invitePublisher.sink { [unowned self] inviteEnvelope in
// Task {try! await invitee.accept(inviteId: inviteEnvelope.pubKey)}
// }.store(in: &publishers)
//
// invitee.newThreadPublisher.sink { [unowned self] thread in
// Task {try! await invitee.message(topic: thread.topic, message: message)}
// }.store(in: &publishers)
//
// inviter.newThreadPublisher.sink { [unowned self] thread in
// Task {try! await inviter.message(topic: thread.topic, message: message)}
// }.store(in: &publishers)
//
// inviter.messagePublisher.sink { message in
// messageExpectation.fulfill()
// }.store(in: &publishers)
//
// invitee.messagePublisher.sink { message in
// messageExpectation.fulfill()
// }.store(in: &publishers)
//
// wait(for: [messageExpectation], timeout: 35)
// }
}
63 changes: 48 additions & 15 deletions Sources/Chat/Chat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,31 @@ import Combine

class Chat {
private var publishers = [AnyCancellable]()
let registry: Registry
let registryService: RegistryService
let invitationHandlingService: InvitationHandlingService
let inviteService: InviteService
private let registry: Registry
private let registryService: RegistryService
private let messagingService: MessagingService
private let invitationHandlingService: InvitationHandlingService
private let inviteService: InviteService
let kms: KeyManagementService
let threadStore: CodableStore<Thread>

let socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never>

var newThreadPublisherSubject = PassthroughSubject<String, Never>()
public var newThreadPublisher: AnyPublisher<String, Never> {
private var newThreadPublisherSubject = PassthroughSubject<Thread, Never>()
public var newThreadPublisher: AnyPublisher<Thread, Never> {
newThreadPublisherSubject.eraseToAnyPublisher()
}

var invitePublisherSubject = PassthroughSubject<InviteEnvelope, Never>()
private var invitePublisherSubject = PassthroughSubject<InviteEnvelope, Never>()
public var invitePublisher: AnyPublisher<InviteEnvelope, Never> {
invitePublisherSubject.eraseToAnyPublisher()
}

private var messagePublisherSubject = PassthroughSubject<Message, Never>()
public var messagePublisher: AnyPublisher<Message, Never> {
messagePublisherSubject.eraseToAnyPublisher()
}

init(registry: Registry,
relayClient: RelayClient,
kms: KeyManagementService,
Expand All @@ -41,14 +48,16 @@ class Chat {
jsonRpcHistory: jsonRpcHistory)
let invitePayloadStore = CodableStore<RequestSubscriptionPayload>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.invite.rawValue)
self.registryService = RegistryService(registry: registry, networkingInteractor: networkingInteractor, kms: kms, logger: logger, topicToInvitationPubKeyStore: topicToInvitationPubKeyStore)
threadStore = CodableStore<Thread>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.threads.rawValue)
self.invitationHandlingService = InvitationHandlingService(registry: registry,
networkingInteractor: networkingInteractor,
kms: kms,
logger: logger,
topicToInvitationPubKeyStore: topicToInvitationPubKeyStore,
invitePayloadStore: invitePayloadStore,
threadsStore: CodableStore<Thread>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.threads.rawValue))
threadsStore: threadStore)
self.inviteService = InviteService(networkingInteractor: networkingInteractor, kms: kms, logger: logger)
self.messagingService = MessagingService(networkingInteractor: networkingInteractor, logger: logger)
socketConnectionStatusPublisher = relayClient.socketConnectionStatusPublisher
setUpEnginesCallbacks()
}
Expand All @@ -57,44 +66,64 @@ class Chat {
/// record is a blockchain account with a client generated public key
/// - Parameter account: CAIP10 blockchain account
/// - Returns: public key
func register(account: Account) async throws -> String {
public func register(account: Account) async throws -> String {
try await registryService.register(account: account)
}

/// Queries the default keyserver with a blockchain account
/// - Parameter account: CAIP10 blockachain account
/// - Returns: public key associated with an account in chat's keyserver
func resolve(account: Account) async throws -> String {
public func resolve(account: Account) async throws -> String {
try await registry.resolve(account: account)
}

/// Sends a chat invite with opening message
/// - Parameters:
/// - publicKey: publicKey associated with a peer
/// - openingMessage: oppening message for a chat invite
func invite(publicKey: String, openingMessage: String) async throws {
public func invite(publicKey: String, openingMessage: String) async throws {
// TODO - how to provide account?
// in init or in invite method's params
let tempAccount = Account("eip155:1:33e32e32")!
try await inviteService.invite(peerPubKey: publicKey, openingMessage: openingMessage, account: tempAccount)
}

func accept(inviteId: String) async throws {
public func accept(inviteId: String) async throws {
try await invitationHandlingService.accept(inviteId: inviteId)
}

public func reject(inviteId: String) async throws {

}

/// Sends a chat message to an active chat thread
/// - Parameters:
/// - topic: thread topic
/// - message: chat message
func message(topic: String, message: String) {

public func message(topic: String, message: String) async throws {
try await messagingService.send(topic: topic, messageString: message)
}

/// To Ping peer client
/// - Parameter topic: chat thread topic
func ping(topic: String) {
public func ping(topic: String) {
fatalError("not implemented")
}

public func leave(topic: String) async throws {
fatalError("not implemented")
}

public func getInvites(account: Account) -> [Invite] {
fatalError("not implemented")
}

public func getThreads(account: Account) -> [Thread] {
fatalError("not implemented")
}

public func getMessages(topic: String) -> [Message] {
fatalError("not implemented")
}

private func setUpEnginesCallbacks() {
Expand All @@ -107,5 +136,9 @@ class Chat {
inviteService.onNewThread = { [unowned self] newThread in
newThreadPublisherSubject.send(newThread)
}
messagingService.onMessage = { [unowned self] message in
messagePublisherSubject.send(message)
}
}
}

23 changes: 3 additions & 20 deletions Sources/Chat/NetworkingInteractor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ protocol NetworkInteracting {
var requestPublisher: AnyPublisher<RequestSubscriptionPayload, Never> {get}
var responsePublisher: AnyPublisher<ChatResponse, Never> {get}
func subscribe(topic: String) async throws
func requestUnencrypted(_ request: JSONRPCRequest<ChatRequestParams>, topic: String) async throws
func request(_ request: JSONRPCRequest<ChatRequestParams>, topic: String, envelopeType: Envelope.EnvelopeType) async throws
func respond(topic: String, response: JsonRpcResult) async throws
}
Expand Down Expand Up @@ -51,13 +50,6 @@ class NetworkingInteractor: NetworkInteracting {
}
}

// TODO - remove the method
func requestUnencrypted(_ request: JSONRPCRequest<ChatRequestParams>, topic: String) async throws {
try jsonRpcHistory.set(topic: topic, request: request)
let message = try! request.json()
try await relayClient.publish(topic: topic, payload: message, tag: .chat)
}

func request(_ request: JSONRPCRequest<ChatRequestParams>, topic: String, envelopeType: Envelope.EnvelopeType) async throws {
try jsonRpcHistory.set(topic: topic, request: request)
let message = try! serializer.serialize(topic: topic, encodable: request, envelopeType: envelopeType)
Expand All @@ -75,26 +67,17 @@ class NetworkingInteractor: NetworkInteracting {

private func manageSubscription(_ topic: String, _ encodedEnvelope: String) {
if let deserializedJsonRpcRequest: JSONRPCRequest<ChatRequestParams> = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleWCRequest(topic: topic, request: deserializedJsonRpcRequest)
} else if let decodedJsonRpcRequest: JSONRPCRequest<ChatRequestParams> = tryDecodeRequest(message: encodedEnvelope) {
handleWCRequest(topic: topic, request: decodedJsonRpcRequest)
handleChatRequest(topic: topic, request: deserializedJsonRpcRequest)
} else if let deserializedJsonRpcResponse: JSONRPCResponse<AnyCodable> = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleJsonRpcResponse(response: deserializedJsonRpcResponse)
} else if let deserializedJsonRpcError: JSONRPCErrorResponse = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleJsonRpcErrorResponse(response: deserializedJsonRpcError)
} else {
print("Warning: WalletConnect Relay - Received unknown object type from networking relay")
}
}

private func tryDecodeRequest(message: String) -> JSONRPCRequest<ChatRequestParams>? {
guard let messageData = message.data(using: .utf8) else {
return nil
print("Warning: Networking Interactor - Received unknown object type from networking relay")
}
return try? JSONDecoder().decode(JSONRPCRequest<ChatRequestParams>.self, from: messageData)
}

private func handleWCRequest(topic: String, request: JSONRPCRequest<ChatRequestParams>) {
private func handleChatRequest(topic: String, request: JSONRPCRequest<ChatRequestParams>) {
let payload = RequestSubscriptionPayload(topic: topic, request: request)
requestPublisherSubject.send(payload)
}
Expand Down
57 changes: 57 additions & 0 deletions Sources/Chat/ProtocolServices/Common/MessagingService.swift
Original file line number Diff line number Diff line change
@@ -1 +1,58 @@
import Foundation
import WalletConnectUtils
import Combine

class MessagingService {
let networkingInteractor: NetworkInteracting
let logger: ConsoleLogging
var onMessage: ((Message) -> Void)?
private var publishers = [AnyCancellable]()

init(networkingInteractor: NetworkInteracting,
logger: ConsoleLogging) {
self.networkingInteractor = networkingInteractor
self.logger = logger
setUpResponseHandling()
setUpRequestHandling()
}

func send(topic: String, messageString: String) async throws {
//TODO - manage author account
let authorAccount = "TODO"
let message = Message(message: messageString, authorAccount: authorAccount, timestamp: JsonRpcID.generate())
let request = JSONRPCRequest<ChatRequestParams>(params: .message(message))
try await networkingInteractor.request(request, topic: topic, envelopeType: .type0)
}

private func setUpResponseHandling() {
networkingInteractor.responsePublisher
.sink { [unowned self] response in
switch response.requestParams {
case .message:
handleMessageResponse(response)
default:
return
}
}.store(in: &publishers)
}

private func setUpRequestHandling() {
networkingInteractor.requestPublisher.sink { [unowned self] subscriptionPayload in
switch subscriptionPayload.request.params {
case .message(let message):
handleMessage(message)
default:
return
}
}.store(in: &publishers)
}

private func handleMessage(_ message: Message) {
onMessage?(message)
logger.debug("Received message")
}

private func handleMessageResponse(_ response: ChatResponse) {
logger.debug("Received Message response")
}
}
Loading