Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Relay] Retry on connection error #506

Merged
merged 3 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions Example/ExampleApp/SceneDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,8 @@ class SceneDelegate: UIResponder, UIWindowSceneDelegate {
return
}
let wcUri = incomingURL.absoluteString.deletingPrefix("https://walletconnect.com/wc?uri=")
let vc = ((window!.rootViewController as! UINavigationController).viewControllers[0] as! WalletViewController)
Task(priority: .high) {try? await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)}
vc.onClientConnected = {
Task(priority: .high) {
do {
try await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)
} catch {
print(error)
}
}
Task(priority: .high) {
try! await Sign.instance.pair(uri: WalletConnectURI(string: wcUri)!)
}
}
}
Expand Down
82 changes: 46 additions & 36 deletions Sources/WalletConnectRelay/Dispatching.swift
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
import Foundation
import Combine
import WalletConnectUtils

protocol Dispatching {
var onConnect: (() -> Void)? {get set}
var onDisconnect: (() -> Void)? {get set}
var onMessage: ((String) -> Void)? {get set}
func send(_ string: String) async throws
var onMessage: ((String) -> Void)? { get set }
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> { get }
func send(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String) async throws
func connect() throws
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws
}

final class Dispatcher: NSObject, Dispatching {
var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var onMessage: ((String) -> Void)?
private var textFramesQueue = Queue<String>()
private let logger: ConsoleLogging
var socket: WebSocketConnecting
var socketConnectionHandler: SocketConnectionHandler

private let logger: ConsoleLogging
private let defaultTimeout: Int = 5

private let socketConnectionStatusPublisherSubject = PassthroughSubject<SocketConnectionStatus, Never>()

var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

init(socket: WebSocketConnecting,
socketConnectionHandler: SocketConnectionHandler,
logger: ConsoleLogging) {
Expand All @@ -31,28 +37,43 @@ final class Dispatcher: NSObject, Dispatching {
setUpSocketConnectionObserving()
}

func send(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
if socket.isConnected {
socket.write(string: string) {
continuation.resume(returning: ())
}
} else {
continuation.resume(throwing: NetworkError.webSocketNotConnected)
}
}
}

func send(_ string: String, completion: @escaping (Error?) -> Void) {
// TODO - add policy for retry and "single try"
if socket.isConnected {
self.socket.write(string: string) {
completion(nil)
}
// TODO - enqueue if fails
} else {
completion(NetworkError.webSocketNotConnected)
// textFramesQueue.enqueue(string)
}
}

func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
guard !socket.isConnected else {
return send(string, completion: completion)
}

var cancellable: AnyCancellable?
cancellable = socketConnectionStatusPublisher.sink { [unowned self] status in
guard status == .connected else { return }
cancellable?.cancel()
send(string, completion: completion)
}

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(defaultTimeout)) {
completion(NetworkError.webSocketNotConnected)
cancellable?.cancel()
}
}

func protectedSend(_ string: String) async throws {
return try await withCheckedThrowingContinuation { continuation in
protectedSend(string) { error in
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
}
}
}

Expand All @@ -72,21 +93,10 @@ final class Dispatcher: NSObject, Dispatching {

private func setUpSocketConnectionObserving() {
socket.onConnect = { [unowned self] in
self.dequeuePendingTextFrames()
self.onConnect?()
self.socketConnectionStatusPublisherSubject.send(.connected)
}
socket.onDisconnect = { [unowned self] _ in
self.onDisconnect?()
}
}

private func dequeuePendingTextFrames() {
while let frame = textFramesQueue.dequeue() {
send(frame) { [unowned self] error in
if let error = error {
self.logger.error(error.localizedDescription)
}
}
self.socketConnectionStatusPublisherSubject.send(.disconnected)
}
}
}
16 changes: 6 additions & 10 deletions Sources/WalletConnectRelay/RelayClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ public final class RelayClient {
}

public var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
dispatcher.socketConnectionStatusPublisher
}

private let messagePublisherSubject = PassthroughSubject<(topic: String, message: String), Never>()
private let socketConnectionStatusPublisherSubject = PassthroughSubject<SocketConnectionStatus, Never>()

private let subscriptionResponsePublisherSubject = PassthroughSubject<(RPCID?, String), Never>()
private var subscriptionResponsePublisher: AnyPublisher<(RPCID?, String), Never> {
Expand Down Expand Up @@ -68,9 +67,6 @@ public final class RelayClient {
dispatcher.onMessage = { [weak self] payload in
self?.handlePayloadMessage(payload)
}
dispatcher.onConnect = { [unowned self] in
self.socketConnectionStatusPublisherSubject.send(.connected)
}
}

/// Instantiates Relay Client
Expand Down Expand Up @@ -132,7 +128,7 @@ public final class RelayClient {
.asRPCRequest()
let message = try request.asJSONEncodedString()
logger.debug("Publishing payload on topic: \(topic)")
try await dispatcher.send(message)
try await dispatcher.protectedSend(message)
}

/// Completes with an acknowledgement from the relay network.
Expand All @@ -156,7 +152,7 @@ public final class RelayClient {
cancellable?.cancel()
onNetworkAcknowledge(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to Publish Payload, error: \(error)")
cancellable?.cancel()
Expand All @@ -183,7 +179,7 @@ public final class RelayClient {
}
completion(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to subscribe to topic \(error)")
cancellable?.cancel()
Expand Down Expand Up @@ -223,7 +219,7 @@ public final class RelayClient {
cancellable?.cancel()
completion(nil)
}
dispatcher.send(message) { [weak self] error in
dispatcher.protectedSend(message) { [weak self] error in
if let error = error {
self?.logger.debug("Failed to unsubscribe from topic")
cancellable?.cancel()
Expand Down Expand Up @@ -279,7 +275,7 @@ public final class RelayClient {
private func acknowledgeRequest(_ request: RPCRequest) throws {
let response = RPCResponse(matchingRequest: request, result: true)
let message = try response.asJSONEncodedString()
dispatcher.send(message) { [unowned self] in
dispatcher.protectedSend(message) { [unowned self] in
if let error = $0 {
logger.debug("Failed to dispatch response: \(response), error: \(error)")
} else {
Expand Down
15 changes: 10 additions & 5 deletions Tests/RelayerTests/DispatcherTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import XCTest
import Combine
@testable import WalletConnectRelay
import TestingUtils
import Combine
Expand Down Expand Up @@ -29,6 +30,7 @@ class WebSocketMock: WebSocketConnecting {
}

final class DispatcherTests: XCTestCase {
var publishers = Set<AnyCancellable>()
var sut: Dispatcher!
var webSocket: WebSocketMock!
var networkMonitor: NetworkMonitoringMock!
Expand Down Expand Up @@ -66,18 +68,21 @@ final class DispatcherTests: XCTestCase {

func testOnConnect() {
let expectation = expectation(description: "on connect")
sut.onConnect = {
sut.socketConnectionStatusPublisher.sink { status in
guard status == .connected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onConnect?()
waitForExpectations(timeout: 0.001)
}

func testOnDisconnect() {
func testOnDisconnect() throws {
let expectation = expectation(description: "on disconnect")
sut.onDisconnect = {
try sut.connect()
sut.socketConnectionStatusPublisher.sink { status in
guard status == .disconnected else { return }
expectation.fulfill()
}
}.store(in: &publishers)
webSocket.onDisconnect?(nil)
waitForExpectations(timeout: 0.001)
}
Expand Down
28 changes: 22 additions & 6 deletions Tests/RelayerTests/Mocks/DispatcherMock.swift
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import Foundation
import JSONRPC
import Combine
@testable import WalletConnectRelay

class DispatcherMock: Dispatching {
private var publishers = Set<AnyCancellable>()
private let socketConnectionStatusPublisherSubject = CurrentValueSubject<SocketConnectionStatus, Never>(.disconnected)
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
return socketConnectionStatusPublisherSubject.eraseToAnyPublisher()
}

var onConnect: (() -> Void)?
var onDisconnect: (() -> Void)?
var sent = false
var lastMessage: String = ""
var onMessage: ((String) -> Void)?

func connect() {}
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {}
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
send(string, completion: completion)
}

var sent = false
var lastMessage: String = ""
func protectedSend(_ string: String) async throws {
try await send(string)
}

func connect() {
socketConnectionStatusPublisherSubject.send(.connected)
}

func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) {
socketConnectionStatusPublisherSubject.send(.disconnected)
}

func send(_ string: String, completion: @escaping (Error?) -> Void) {
sent = true
Expand Down