Skip to content

Commit

Permalink
Merge pull request #506 from WalletConnect/feature/relay-retry
Browse files Browse the repository at this point in the history
[Relay] Retry on connection error
  • Loading branch information
flypaper0 authored Sep 19, 2022
2 parents 93bef41 + 355e63b commit e583fea
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 67 deletions.
12 changes: 2 additions & 10 deletions Example/ExampleApp/SceneDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,8 @@ class SceneDelegate: UIResponder, UIWindowSceneDelegate {
return
}
let wcUri = incomingURL.absoluteString.deletingPrefix("https://walletconnect.com/wc?uri=")
let vc = ((window!.rootViewController as! UINavigationController).viewControllers[0] as! WalletViewController)
Task(priority: .high) {try? await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)}
vc.onClientConnected = {
Task(priority: .high) {
do {
try await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)
} catch {
print(error)
}
}
Task(priority: .high) {
try! await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)
}
}
}
Expand Down
82 changes: 46 additions & 36 deletions Sources/WalletConnectRelay/Dispatching.swift
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
import Foundation
import Combine
import WalletConnectUtils

protocol Dispatching {
var onConnect: (() -> Void)? {get set}
var onDisconnect: (() -> Void)? {get set}
var onMessage: ((String) -> Void)? {get set}
func send(_ string: String) async throws
var onMessage: ((String) -> Void)? { get set }
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> { get }
func send(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String) async throws
func connect() throws
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws
}

final class Dispatcher: NSObject, Dispatching {
var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var onMessage: ((String) -> Void)?
private var textFramesQueue = Queue<String>()
private let logger: ConsoleLogging
var socket: WebSocketConnecting
var socketConnectionHandler: SocketConnectionHandler

private let logger: ConsoleLogging
private let defaultTimeout: Int = 5

private let socketConnectionStatusPublisherSubject = PassthroughSubject<SocketConnectionStatus, Never>()

var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

init(socket: WebSocketConnecting,
socketConnectionHandler: SocketConnectionHandler,
logger: ConsoleLogging) {
Expand All @@ -31,28 +37,43 @@ final class Dispatcher: NSObject, Dispatching {
setUpSocketConnectionObserving()
}

func send(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
if socket.isConnected {
socket.write(string: string) {
continuation.resume(returning: ())
}
} else {
continuation.resume(throwing: NetworkError.webSocketNotConnected)
}
}
}

func send(_ string: String, completion: @escaping (Error?) -> Void) {
// TODO - add policy for retry and "single try"
if socket.isConnected {
self.socket.write(string: string) {
completion(nil)
}
// TODO - enqueue if fails
} else {
completion(NetworkError.webSocketNotConnected)
// textFramesQueue.enqueue(string)
}
}

func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
guard !socket.isConnected else {
return send(string, completion: completion)
}

var cancellable: AnyCancellable?
cancellable = socketConnectionStatusPublisher.sink { [unowned self] status in
guard status == .connected else { return }
cancellable?.cancel()
send(string, completion: completion)
}

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(defaultTimeout)) {
completion(NetworkError.webSocketNotConnected)
cancellable?.cancel()
}
}

func protectedSend(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
protectedSend(string) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

Expand All @@ -72,21 +93,10 @@ final class Dispatcher: NSObject, Dispatching {

private func setUpSocketConnectionObserving() {
socket.onConnect = { [unowned self] in
self.dequeuePendingTextFrames()
self.onConnect?()
self.socketConnectionStatusPublisherSubject.send(.connected)
}
socket.onDisconnect = { [unowned self] _ in
self.onDisconnect?()
}
}

private func dequeuePendingTextFrames() {
while let frame = textFramesQueue.dequeue() {
send(frame) { [unowned self] error in
if let error = error {
self.logger.error(error.localizedDescription)
}
}
self.socketConnectionStatusPublisherSubject.send(.disconnected)
}
}
}
16 changes: 6 additions & 10 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public final class RelayClient {
}

public var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
dispatcher.socketConnectionStatusPublisher
}

private let messagePublisherSubject = PassthroughSubject<(topic: String, message: String), Never>()
private let socketConnectionStatusPublisherSubject = PassthroughSubject<SocketConnectionStatus, Never>()

private let subscriptionResponsePublisherSubject = PassthroughSubject<(RPCID?, String), Never>()
private var subscriptionResponsePublisher: AnyPublisher<(RPCID?, String), Never> {
Expand Down Expand Up @@ -68,9 +67,6 @@ public final class RelayClient {
dispatcher.onMessage = { [weak self] payload in
self?.handlePayloadMessage(payload)
}
dispatcher.onConnect = { [unowned self] in
self.socketConnectionStatusPublisherSubject.send(.connected)
}
}

/// Instantiates Relay Client
Expand Down Expand Up @@ -132,7 +128,7 @@ public final class RelayClient {
.asRPCRequest()
let message = try request.asJSONEncodedString()
logger.debug("Publishing payload on topic: \(topic)")
try await dispatcher.send(message)
try await dispatcher.protectedSend(message)
}

/// Completes with an acknowledgement from the relay network.
Expand All @@ -156,7 +152,7 @@ public final class RelayClient {
cancellable?.cancel()
onNetworkAcknowledge(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to Publish Payload, error: \(error)")
cancellable?.cancel()
Expand All @@ -183,7 +179,7 @@ public final class RelayClient {
}
completion(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to subscribe to topic \(error)")
cancellable?.cancel()
Expand Down Expand Up @@ -223,7 +219,7 @@ public final class RelayClient {
cancellable?.cancel()
completion(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to unsubscribe from topic")
cancellable?.cancel()
Expand Down Expand Up @@ -279,7 +275,7 @@ public final class RelayClient {
private func acknowledgeRequest(_ request: RPCRequest) throws {
let response = RPCResponse(matchingRequest: request, result: true)
let message = try response.asJSONEncodedString()
dispatcher.send(message) { [unowned self] in
dispatcher.protectedSend(message) { [unowned self] in
if let error = $0 {
logger.debug("Failed to dispatch response: \(response), error: \(error)")
} else {
Expand Down
15 changes: 10 additions & 5 deletions Tests/RelayerTests/DispatcherTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import XCTest
import Combine
@testable import WalletConnectRelay
import TestingUtils
import Combine
Expand Down Expand Up @@ -29,6 +30,7 @@ class WebSocketMock: WebSocketConnecting {
}

final class DispatcherTests: XCTestCase {
var publishers = Set<AnyCancellable>()
var sut: Dispatcher!
var webSocket: WebSocketMock!
var networkMonitor: NetworkMonitoringMock!
Expand Down Expand Up @@ -66,18 +68,21 @@ final class DispatcherTests: XCTestCase {

func testOnConnect() {
let expectation = expectation(description: "on connect")
sut.onConnect = {
sut.socketConnectionStatusPublisher.sink { status in
guard status == .connected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onConnect?()
waitForExpectations(timeout: 0.001)
}

func testOnDisconnect() {
func testOnDisconnect() throws {
let expectation = expectation(description: "on disconnect")
sut.onDisconnect = {
try sut.connect()
sut.socketConnectionStatusPublisher.sink { status in
guard status == .disconnected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onDisconnect?(nil)
waitForExpectations(timeout: 0.001)
}
Expand Down
28 changes: 22 additions & 6 deletions Tests/RelayerTests/Mocks/DispatcherMock.swift
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import Foundation
import JSONRPC
import Combine
@testable import WalletConnectRelay

class DispatcherMock: Dispatching {
private var publishers = Set<AnyCancellable>()
private let socketConnectionStatusPublisherSubject = CurrentValueSubject<SocketConnectionStatus, Never>(.disconnected)
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
return socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var sent = false
var lastMessage: String = ""
var onMessage: ((String) -> Void)?

func connect() {}
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {}
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
send(string, completion: completion)
}

var sent = false
var lastMessage: String = ""
func protectedSend(_ string: String) async throws {
try await send(string)
}

func connect() {
socketConnectionStatusPublisherSubject.send(.connected)
}

func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
socketConnectionStatusPublisherSubject.send(.disconnected)
}

func send(_ string: String, completion: @escaping (Error?) -> Void) {
sent = true
Expand Down

0 comments on commit e583fea

Please sign in to comment.