Skip to content

Commit

Permalink
PR suggetsions
Browse files Browse the repository at this point in the history
  • Loading branch information
flypaper0 committed Sep 15, 2022
1 parent 51a0bcd commit e8a2762
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 63 deletions.
55 changes: 46 additions & 9 deletions Sources/WalletConnectRelay/Dispatching.swift
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
import Foundation
import Combine
import WalletConnectUtils

protocol Dispatching {
var onConnect: (() -> Void)? {get set}
var onDisconnect: (() -> Void)? {get set}
var onMessage: ((String) -> Void)? {get set}
var onMessage: ((String) -> Void)? { get set }
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> { get }
func send(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String) async throws
func connect() throws
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws
}

final class Dispatcher: NSObject, Dispatching {
var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var onMessage: ((String) -> Void)?
private var textFramesQueue = Queue<String>()
private let logger: ConsoleLogging
var socket: WebSocketConnecting
var socketConnectionHandler: SocketConnectionHandler

private let logger: ConsoleLogging
private let defaultTimeout: Int = 5

private let socketConnectionStatusPublisherSubject = CurrentValueSubject<SocketConnectionStatus, Never>(.disconnected)

var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

init(socket: WebSocketConnecting,
socketConnectionHandler: SocketConnectionHandler,
logger: ConsoleLogging) {
Expand All @@ -40,6 +47,36 @@ final class Dispatcher: NSObject, Dispatching {
}
}

func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
guard socketConnectionStatusPublisherSubject.value == .disconnected else {
return send(string, completion: completion)
}

var cancellable: AnyCancellable?
cancellable = socketConnectionStatusPublisher.sink { [unowned self] status in
guard status == .connected else { return }
defer { cancellable?.cancel() }
send(string, completion: completion)
}

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(defaultTimeout)) {
completion(NetworkError.webSocketNotConnected)
cancellable?.cancel()
}
}

func protectedSend(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
protectedSend(string) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

func connect() throws {
try socketConnectionHandler.handleConnect()
}
Expand All @@ -56,10 +93,10 @@ final class Dispatcher: NSObject, Dispatching {

private func setUpSocketConnectionObserving() {
socket.onConnect = { [unowned self] in
self.onConnect?()
self.socketConnectionStatusPublisherSubject.send(.connected)
}
socket.onDisconnect = { [unowned self] _ in
self.onDisconnect?()
self.socketConnectionStatusPublisherSubject.send(.disconnected)
}
}
}
49 changes: 6 additions & 43 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public final class RelayClient {
}

public var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
dispatcher.socketConnectionStatusPublisher
}

private let messagePublisherSubject = PassthroughSubject<(topic: String, message: String), Never>()
private let socketConnectionStatusPublisherSubject = CurrentValueSubject<SocketConnectionStatus, Never>(.disconnected)

private let subscriptionResponsePublisherSubject = PassthroughSubject<(RPCID?, String), Never>()
private var subscriptionResponsePublisher: AnyPublisher<(RPCID?, String), Never> {
Expand Down Expand Up @@ -68,12 +67,6 @@ public final class RelayClient {
dispatcher.onMessage = { [weak self] payload in
self?.handlePayloadMessage(payload)
}
dispatcher.onConnect = { [unowned self] in
self.socketConnectionStatusPublisherSubject.send(.connected)
}
dispatcher.onDisconnect = {
self.socketConnectionStatusPublisherSubject.send(.disconnected)
}
}

/// Instantiates Relay Client
Expand Down Expand Up @@ -135,7 +128,7 @@ public final class RelayClient {
.asRPCRequest()
let message = try request.asJSONEncodedString()
logger.debug("Publishing payload on topic: \(topic)")
try await protectedSend(message)
try await dispatcher.protectedSend(message)
}

/// Completes with an acknowledgement from the relay network.
Expand All @@ -159,7 +152,7 @@ public final class RelayClient {
cancellable?.cancel()
onNetworkAcknowledge(nil)
}
protectedSend(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to Publish Payload, error: \(error)")
cancellable?.cancel()
Expand All @@ -186,7 +179,7 @@ public final class RelayClient {
}
completion(nil)
}
protectedSend(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to subscribe to topic \(error)")
cancellable?.cancel()
Expand Down Expand Up @@ -226,7 +219,7 @@ public final class RelayClient {
cancellable?.cancel()
completion(nil)
}
protectedSend(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to unsubscribe from topic")
cancellable?.cancel()
Expand Down Expand Up @@ -282,7 +275,7 @@ public final class RelayClient {
private func acknowledgeRequest(_ request: RPCRequest) throws {
let response = RPCResponse(matchingRequest: request, result: true)
let message = try response.asJSONEncodedString()
protectedSend(message) { [unowned self] in
dispatcher.protectedSend(message) { [unowned self] in
if let error = $0 {
logger.debug("Failed to dispatch response: \(response), error: \(error)")
} else {
Expand All @@ -294,34 +287,4 @@ public final class RelayClient {
}
}
}

private func protectedSend(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
protectedSend(string) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

private func protectedSend(_ string: String, timeout: Int = 5, completion: @escaping (Error?) -> Void) {
guard socketConnectionStatusPublisherSubject.value == .disconnected else {
return dispatcher.send(string, completion: completion)
}

var cancellable: AnyCancellable?
cancellable = socketConnectionStatusPublisher.sink { [unowned self] status in
guard status == .connected else { return }
defer { cancellable?.cancel() }
dispatcher.send(string, completion: completion)
}

concurrentQueue.asyncAfter(deadline: .now() + .seconds(timeout)) {
completion(NetworkError.webSocketNotConnected)
cancellable?.cancel()
}
}
}
15 changes: 10 additions & 5 deletions Tests/RelayerTests/DispatcherTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import XCTest
import Combine
@testable import WalletConnectRelay
import TestingUtils
import Combine
Expand Down Expand Up @@ -29,6 +30,7 @@ class WebSocketMock: WebSocketConnecting {
}

final class DispatcherTests: XCTestCase {
var publishers = Set<AnyCancellable>()
var sut: Dispatcher!
var webSocket: WebSocketMock!
var networkMonitor: NetworkMonitoringMock!
Expand Down Expand Up @@ -66,18 +68,21 @@ final class DispatcherTests: XCTestCase {

func testOnConnect() {
let expectation = expectation(description: "on connect")
sut.onConnect = {
sut.socketConnectionStatusPublisher.sink { status in
guard status == .connected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onConnect?()
waitForExpectations(timeout: 0.001)
}

func testOnDisconnect() {
func testOnDisconnect() throws {
let expectation = expectation(description: "on disconnect")
sut.onDisconnect = {
try sut.connect()
sut.socketConnectionStatusPublisher.sink { status in
guard status == .disconnected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onDisconnect?(nil)
waitForExpectations(timeout: 0.001)
}
Expand Down
28 changes: 22 additions & 6 deletions Tests/RelayerTests/Mocks/DispatcherMock.swift
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import Foundation
import JSONRPC
import Combine
@testable import WalletConnectRelay

class DispatcherMock: Dispatching {
private var publishers = Set<AnyCancellable>()
private let socketConnectionStatusPublisherSubject = CurrentValueSubject<SocketConnectionStatus, Never>(.disconnected)
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
return socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var sent = false
var lastMessage: String = ""
var onMessage: ((String) -> Void)?

func connect() {}
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {}
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
send(string, completion: completion)
}

var sent = false
var lastMessage: String = ""
func protectedSend(_ string: String) async throws {
try await send(string)
}

func connect() {
socketConnectionStatusPublisherSubject.send(.connected)
}

func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
socketConnectionStatusPublisherSubject.send(.disconnected)
}

func send(_ string: String, completion: @escaping (Error?) -> Void) {
sent = true
Expand Down

0 comments on commit e8a2762

Please sign in to comment.