Skip to content

Commit

Permalink
Update thread creation
Browse files Browse the repository at this point in the history
  • Loading branch information
flypaper0 committed Jul 7, 2022
1 parent f36fe6b commit cb1f1bb
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 116 deletions.
91 changes: 62 additions & 29 deletions Example/IntegrationTests/Chat/ChatTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,70 @@ final class ChatTests: XCTestCase {
}

func testInvite() async {
// await waitClientsConnected()
// let inviteExpectation = expectation(description: "invitation expectation")
// let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
// let pubKey = try! await invitee.register(account: account)
// try! await inviter.invite(publicKey: pubKey, openingMessage: "")
// invitee.invitePublisher.sink { _ in
// inviteExpectation.fulfill()
// }.store(in: &publishers)
// wait(for: [inviteExpectation], timeout: 4)
await waitClientsConnected()
let inviteExpectation = expectation(description: "invitation expectation")
let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
let pubKey = try! await invitee.register(account: account)
try! await inviter.invite(publicKey: pubKey, openingMessage: "")
invitee.invitePublisher.sink { _ in
inviteExpectation.fulfill()
}.store(in: &publishers)
wait(for: [inviteExpectation], timeout: 4)
}

func testAcceptAndCreateNewThread() async {
// await waitClientsConnected()
// let newThreadInviterExpectation = expectation(description: "new thread on inviting client expectation")
// let newThreadinviteeExpectation = expectation(description: "new thread on invitee client expectation")
// let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
// let pubKey = try! await invitee.register(account: account)
// try! await inviter.invite(publicKey: pubKey, openingMessage: "opening message")
//
// invitee.invitePublisher.sink { [unowned self] inviteEnvelope in
// Task {try! await invitee.accept(inviteId: inviteEnvelope.pubKey)}
// }.store(in: &publishers)
//
// invitee.newThreadPublisher.sink { _ in
// newThreadinviteeExpectation.fulfill()
// }.store(in: &publishers)
//
// inviter.newThreadPublisher.sink { _ in
// newThreadInviterExpectation.fulfill()
// }.store(in: &publishers)
//
// wait(for: [newThreadinviteeExpectation, newThreadInviterExpectation], timeout: 4)
await waitClientsConnected()
let newThreadInviterExpectation = expectation(description: "new thread on inviting client expectation")
let newThreadinviteeExpectation = expectation(description: "new thread on invitee client expectation")
let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
let pubKey = try! await invitee.register(account: account)
try! await inviter.invite(publicKey: pubKey, openingMessage: "opening message")

invitee.invitePublisher.sink { [unowned self] inviteEnvelope in
Task {try! await invitee.accept(inviteId: inviteEnvelope.pubKey)}
}.store(in: &publishers)

invitee.newThreadPublisher.sink { _ in
newThreadinviteeExpectation.fulfill()
}.store(in: &publishers)

inviter.newThreadPublisher.sink { _ in
newThreadInviterExpectation.fulfill()
}.store(in: &publishers)

wait(for: [newThreadinviteeExpectation, newThreadInviterExpectation], timeout: 30)
}

func testMessage() async {
await waitClientsConnected()
let messageExpectation = expectation(description: "message received")
messageExpectation.expectedFulfillmentCount = 2
let message = "message"

let account = Account(chainIdentifier: "eip155:1", address: "0x3627523167367216556273151")!
let pubKey = try! await invitee.register(account: account)
try! await inviter.invite(publicKey: pubKey, openingMessage: "opening message")

invitee.invitePublisher.sink { [unowned self] inviteEnvelope in
Task {try! await invitee.accept(inviteId: inviteEnvelope.pubKey)}
}.store(in: &publishers)

invitee.newThreadPublisher.sink { [unowned self] thread in
Task {try! await invitee.message(topic: thread.topic, message: message)}
}.store(in: &publishers)

inviter.newThreadPublisher.sink { [unowned self] thread in
Task {try! await inviter.message(topic: thread.topic, message: message)}
}.store(in: &publishers)

inviter.messagePublisher.sink { message in
messageExpectation.fulfill()
}.store(in: &publishers)

invitee.messagePublisher.sink { message in
messageExpectation.fulfill()
}.store(in: &publishers)

wait(for: [messageExpectation], timeout: 35)
}
}
63 changes: 48 additions & 15 deletions Sources/Chat/Chat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,31 @@ import Combine

class Chat {
private var publishers = [AnyCancellable]()
let registry: Registry
let registryService: RegistryService
let invitationHandlingService: InvitationHandlingService
let inviteService: InviteService
private let registry: Registry
private let registryService: RegistryService
private let messagingService: MessagingService
private let invitationHandlingService: InvitationHandlingService
private let inviteService: InviteService
let kms: KeyManagementService
let threadStore: CodableStore<Thread>

let socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never>

var newThreadPublisherSubject = PassthroughSubject<String, Never>()
public var newThreadPublisher: AnyPublisher<String, Never> {
private var newThreadPublisherSubject = PassthroughSubject<Thread, Never>()
public var newThreadPublisher: AnyPublisher<Thread, Never> {
newThreadPublisherSubject.eraseToAnyPublisher()
}

var invitePublisherSubject = PassthroughSubject<InviteEnvelope, Never>()
private var invitePublisherSubject = PassthroughSubject<InviteEnvelope, Never>()
public var invitePublisher: AnyPublisher<InviteEnvelope, Never> {
invitePublisherSubject.eraseToAnyPublisher()
}

private var messagePublisherSubject = PassthroughSubject<Message, Never>()
public var messagePublisher: AnyPublisher<Message, Never> {
messagePublisherSubject.eraseToAnyPublisher()
}

init(registry: Registry,
relayClient: RelayClient,
kms: KeyManagementService,
Expand All @@ -41,14 +48,16 @@ class Chat {
jsonRpcHistory: jsonRpcHistory)
let invitePayloadStore = CodableStore<RequestSubscriptionPayload>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.invite.rawValue)
self.registryService = RegistryService(registry: registry, networkingInteractor: networkingInteractor, kms: kms, logger: logger, topicToInvitationPubKeyStore: topicToInvitationPubKeyStore)
threadStore = CodableStore<Thread>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.threads.rawValue)
self.invitationHandlingService = InvitationHandlingService(registry: registry,
networkingInteractor: networkingInteractor,
kms: kms,
logger: logger,
topicToInvitationPubKeyStore: topicToInvitationPubKeyStore,
invitePayloadStore: invitePayloadStore,
threadsStore: CodableStore<Thread>(defaults: keyValueStorage, identifier: StorageDomainIdentifiers.threads.rawValue))
threadsStore: threadStore)
self.inviteService = InviteService(networkingInteractor: networkingInteractor, kms: kms, logger: logger)
self.messagingService = MessagingService(networkingInteractor: networkingInteractor, logger: logger)
socketConnectionStatusPublisher = relayClient.socketConnectionStatusPublisher
setUpEnginesCallbacks()
}
Expand All @@ -57,44 +66,64 @@ class Chat {
/// record is a blockchain account with a client generated public key
/// - Parameter account: CAIP10 blockchain account
/// - Returns: public key
func register(account: Account) async throws -> String {
public func register(account: Account) async throws -> String {
try await registryService.register(account: account)
}

/// Queries the default keyserver with a blockchain account
/// - Parameter account: CAIP10 blockachain account
/// - Returns: public key associated with an account in chat's keyserver
func resolve(account: Account) async throws -> String {
public func resolve(account: Account) async throws -> String {
try await registry.resolve(account: account)
}

/// Sends a chat invite with opening message
/// - Parameters:
/// - publicKey: publicKey associated with a peer
/// - openingMessage: oppening message for a chat invite
func invite(publicKey: String, openingMessage: String) async throws {
public func invite(publicKey: String, openingMessage: String) async throws {
// TODO - how to provide account?
// in init or in invite method's params
let tempAccount = Account("eip155:1:33e32e32")!
try await inviteService.invite(peerPubKey: publicKey, openingMessage: openingMessage, account: tempAccount)
}

func accept(inviteId: String) async throws {
public func accept(inviteId: String) async throws {
try await invitationHandlingService.accept(inviteId: inviteId)
}

public func reject(inviteId: String) async throws {

}

/// Sends a chat message to an active chat thread
/// - Parameters:
/// - topic: thread topic
/// - message: chat message
func message(topic: String, message: String) {

public func message(topic: String, message: String) async throws {
try await messagingService.send(topic: topic, messageString: message)
}

/// To Ping peer client
/// - Parameter topic: chat thread topic
func ping(topic: String) {
public func ping(topic: String) {
fatalError("not implemented")
}

public func leave(topic: String) async throws {
fatalError("not implemented")
}

public func getInvites(account: Account) -> [Invite] {
fatalError("not implemented")
}

public func getThreads(account: Account) -> [Thread] {
fatalError("not implemented")
}

public func getMessages(topic: String) -> [Message] {
fatalError("not implemented")
}

private func setUpEnginesCallbacks() {
Expand All @@ -107,5 +136,9 @@ class Chat {
inviteService.onNewThread = { [unowned self] newThread in
newThreadPublisherSubject.send(newThread)
}
messagingService.onMessage = { [unowned self] message in
messagePublisherSubject.send(message)
}
}
}

23 changes: 3 additions & 20 deletions Sources/Chat/NetworkingInteractor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ protocol NetworkInteracting {
var requestPublisher: AnyPublisher<RequestSubscriptionPayload, Never> {get}
var responsePublisher: AnyPublisher<ChatResponse, Never> {get}
func subscribe(topic: String) async throws
func requestUnencrypted(_ request: JSONRPCRequest<ChatRequestParams>, topic: String) async throws
func request(_ request: JSONRPCRequest<ChatRequestParams>, topic: String, envelopeType: Envelope.EnvelopeType) async throws
func respond(topic: String, response: JsonRpcResult) async throws
}
Expand Down Expand Up @@ -51,13 +50,6 @@ class NetworkingInteractor: NetworkInteracting {
}
}

// TODO - remove the method
func requestUnencrypted(_ request: JSONRPCRequest<ChatRequestParams>, topic: String) async throws {
try jsonRpcHistory.set(topic: topic, request: request)
let message = try! request.json()
try await relayClient.publish(topic: topic, payload: message, tag: .chat)
}

func request(_ request: JSONRPCRequest<ChatRequestParams>, topic: String, envelopeType: Envelope.EnvelopeType) async throws {
try jsonRpcHistory.set(topic: topic, request: request)
let message = try! serializer.serialize(topic: topic, encodable: request, envelopeType: envelopeType)
Expand All @@ -75,26 +67,17 @@ class NetworkingInteractor: NetworkInteracting {

private func manageSubscription(_ topic: String, _ encodedEnvelope: String) {
if let deserializedJsonRpcRequest: JSONRPCRequest<ChatRequestParams> = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleWCRequest(topic: topic, request: deserializedJsonRpcRequest)
} else if let decodedJsonRpcRequest: JSONRPCRequest<ChatRequestParams> = tryDecodeRequest(message: encodedEnvelope) {
handleWCRequest(topic: topic, request: decodedJsonRpcRequest)
handleChatRequest(topic: topic, request: deserializedJsonRpcRequest)
} else if let deserializedJsonRpcResponse: JSONRPCResponse<AnyCodable> = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleJsonRpcResponse(response: deserializedJsonRpcResponse)
} else if let deserializedJsonRpcError: JSONRPCErrorResponse = serializer.tryDeserialize(topic: topic, encodedEnvelope: encodedEnvelope) {
handleJsonRpcErrorResponse(response: deserializedJsonRpcError)
} else {
print("Warning: WalletConnect Relay - Received unknown object type from networking relay")
}
}

private func tryDecodeRequest(message: String) -> JSONRPCRequest<ChatRequestParams>? {
guard let messageData = message.data(using: .utf8) else {
return nil
print("Warning: Networking Interactor - Received unknown object type from networking relay")
}
return try? JSONDecoder().decode(JSONRPCRequest<ChatRequestParams>.self, from: messageData)
}

private func handleWCRequest(topic: String, request: JSONRPCRequest<ChatRequestParams>) {
private func handleChatRequest(topic: String, request: JSONRPCRequest<ChatRequestParams>) {
let payload = RequestSubscriptionPayload(topic: topic, request: request)
requestPublisherSubject.send(payload)
}
Expand Down
57 changes: 57 additions & 0 deletions Sources/Chat/ProtocolServices/Common/MessagingService.swift
Original file line number Diff line number Diff line change
@@ -1 +1,58 @@
import Foundation
import WalletConnectUtils
import Combine

class MessagingService {
let networkingInteractor: NetworkInteracting
let logger: ConsoleLogging
var onMessage: ((Message) -> Void)?
private var publishers = [AnyCancellable]()

init(networkingInteractor: NetworkInteracting,
logger: ConsoleLogging) {
self.networkingInteractor = networkingInteractor
self.logger = logger
setUpResponseHandling()
setUpRequestHandling()
}

func send(topic: String, messageString: String) async throws {
//TODO - manage author account
let authorAccount = "TODO"
let message = Message(message: messageString, authorAccount: authorAccount, timestamp: JsonRpcID.generate())
let request = JSONRPCRequest<ChatRequestParams>(params: .message(message))
try await networkingInteractor.request(request, topic: topic, envelopeType: .type0)
}

private func setUpResponseHandling() {
networkingInteractor.responsePublisher
.sink { [unowned self] response in
switch response.requestParams {
case .message:
handleMessageResponse(response)
default:
return
}
}.store(in: &publishers)
}

private func setUpRequestHandling() {
networkingInteractor.requestPublisher.sink { [unowned self] subscriptionPayload in
switch subscriptionPayload.request.params {
case .message(let message):
handleMessage(message)
default:
return
}
}.store(in: &publishers)
}

private func handleMessage(_ message: Message) {
onMessage?(message)
logger.debug("Received message")
}

private func handleMessageResponse(_ response: ChatResponse) {
logger.debug("Received Message response")
}
}
Loading

0 comments on commit cb1f1bb

Please sign in to comment.