Skip to content

Commit

Permalink
Networking error publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
flypaper0 committed Aug 31, 2023
1 parent 42e42d8 commit 4d01f54
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 116 deletions.
13 changes: 13 additions & 0 deletions Sources/WalletConnectNetworking/NetworkInteracting.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ public protocol NetworkInteracting {
on request: ProtocolMethod
) -> AnyPublisher<ResponseSubscriptionErrorPayload<Request>, Never>

func subscribeOnRequest<RequestParams: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: RequestParams.Type,
subscription: @escaping (RequestSubscriptionPayload<RequestParams>) async throws -> Void
)

func subscribeOnResponse<Request: Codable, Response: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: Request.Type,
responseOfType: Response.Type,
subscription: @escaping (ResponseSubscriptionPayload<Request, Response>) async throws -> Void
)

func getClientId() throws -> String
}

Expand Down
41 changes: 41 additions & 0 deletions Sources/WalletConnectNetworking/NetworkingInteractor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class NetworkingInteractor: NetworkInteracting {
private let rpcHistory: RPCHistory
private let logger: ConsoleLogging

private let errorPublisherSubject = PassthroughSubject<Error, Never>()

private let requestPublisherSubject = PassthroughSubject<(topic: String, request: RPCRequest, decryptedPayload: Data, publishedAt: Date, derivedTopic: String?), Never>()
private let responsePublisherSubject = PassthroughSubject<(topic: String, request: RPCRequest, response: RPCResponse, publishedAt: Date, derivedTopic: String?), Never>()

Expand All @@ -19,6 +21,10 @@ public class NetworkingInteractor: NetworkInteracting {
responsePublisherSubject.eraseToAnyPublisher()
}

public var errorPublisher: AnyPublisher<Error, Never> {
return errorPublisherSubject.eraseToAnyPublisher()
}

public var logsPublisher: AnyPublisher<Log, Never> {
logger.logsPublisher
.merge(with: serializer.logsPublisher)
Expand Down Expand Up @@ -83,6 +89,41 @@ public class NetworkingInteractor: NetworkInteracting {
rpcHistory.deleteAll(forTopics: topics)
}

public func subscribeOnRequest<RequestParams: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: RequestParams.Type,
subscription: @escaping (RequestSubscriptionPayload<RequestParams>) async throws -> Void
) {
requestSubscription(on: protocolMethod)
.sink { [unowned self] (payload: RequestSubscriptionPayload<RequestParams>) in
Task(priority: .high) {
do {
try await subscription(payload)
} catch {
errorPublisherSubject.send(error)
}
}
}.store(in: &publishers)
}

public func subscribeOnResponse<Request: Codable, Response: Codable>(
protocolMethod: ProtocolMethod,
requestOfType: Request.Type,
responseOfType: Response.Type,
subscription: @escaping (ResponseSubscriptionPayload<Request, Response>) async throws -> Void
) {
responseSubscription(on: protocolMethod)
.sink { [unowned self] (payload: ResponseSubscriptionPayload<Request, Response>) in
Task(priority: .high) {
do {
try await subscription(payload)
} catch {
errorPublisherSubject.send(error)
}
}
}.store(in: &publishers)
}

public func requestSubscription<RequestParams: Codable>(on request: ProtocolMethod) -> AnyPublisher<RequestSubscriptionPayload<RequestParams>, Never> {
return requestPublisher
.filter { rpcRequest in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ class DeleteNotifySubscriptionSubscriber {
private let networkingInteractor: NetworkInteracting
private let kms: KeyManagementServiceProtocol
private let logger: ConsoleLogging
private var publishers = [AnyCancellable]()
private let notifyStorage: NotifyStorage

init(networkingInteractor: NetworkInteracting,
Expand All @@ -21,14 +20,12 @@ class DeleteNotifySubscriptionSubscriber {
}

private func subscribeForDeleteSubscription() {
let protocolMethod = NotifyDeleteProtocolMethod()
networkingInteractor.requestSubscription(on: protocolMethod)
.sink { [unowned self] (payload: RequestSubscriptionPayload<NotifyDeleteResponsePayload.Wrapper>) in

guard let (_, _) = try? NotifyDeleteResponsePayload.decodeAndVerify(from: payload.request)
else { fatalError() /* TODO: Handle error */ }

logger.debug("Peer deleted subscription")
}.store(in: &publishers)
networkingInteractor.subscribeOnRequest(
protocolMethod: NotifyDeleteProtocolMethod(),
requestOfType: NotifyDeleteResponsePayload.Wrapper.self
) { [unowned self] payload in
let (_, _) = try NotifyDeleteResponsePayload.decodeAndVerify(from: payload.request)
logger.debug("Peer deleted subscription")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class NotifyMessageSubscriber {
private let notifyStorage: NotifyStorage
private let crypto: CryptoProvider
private let logger: ConsoleLogging
private var publishers = [AnyCancellable]()
private let notifyMessagePublisherSubject = PassthroughSubject<NotifyMessageRecord, Never>()

public var notifyMessagePublisher: AnyPublisher<NotifyMessageRecord, Never> {
Expand All @@ -26,44 +25,40 @@ class NotifyMessageSubscriber {
}

private func subscribeForNotifyMessages() {
let protocolMethod = NotifyMessageProtocolMethod()
networkingInteractor.requestSubscription(on: protocolMethod)
.sink { [unowned self] (payload: RequestSubscriptionPayload<NotifyMessagePayload.Wrapper>) in
networkingInteractor.subscribeOnRequest(
protocolMethod: NotifyMessageProtocolMethod(),
requestOfType: NotifyMessagePayload.Wrapper.self
) { [unowned self] payload in
logger.debug("Received Notify Message")

logger.debug("Received Notify Message")
let (messagePayload, claims) = try NotifyMessagePayload.decodeAndVerify(from: payload.request)
let dappPubKey = try DIDKey(did: claims.iss)
let messageData = try JSONEncoder().encode(messagePayload.message)

Task(priority: .high) {
let (messagePayload, claims) = try NotifyMessagePayload.decodeAndVerify(from: payload.request)
let dappPubKey = try DIDKey(did: claims.iss)
let messageData = try JSONEncoder().encode(messagePayload.message)
let record = NotifyMessageRecord(id: payload.id.string, topic: payload.topic, message: messagePayload.message, publishedAt: payload.publishedAt)
notifyStorage.setMessage(record)
notifyMessagePublisherSubject.send(record)

let record = NotifyMessageRecord(id: payload.id.string, topic: payload.topic, message: messagePayload.message, publishedAt: payload.publishedAt)
notifyStorage.setMessage(record)
notifyMessagePublisherSubject.send(record)
let receiptPayload = NotifyMessageReceiptPayload(
keyserver: keyserver, dappPubKey: dappPubKey,
messageHash: crypto.keccak256(messageData).toHexString(),
app: messagePayload.app
)

let receiptPayload = NotifyMessageReceiptPayload(
keyserver: keyserver, dappPubKey: dappPubKey,
messageHash: crypto.keccak256(messageData).toHexString(),
app: messagePayload.app
)
let wrapper = try identityClient.signAndCreateWrapper(
payload: receiptPayload,
account: messagePayload.account
)

let wrapper = try identityClient.signAndCreateWrapper(
payload: receiptPayload,
account: messagePayload.account
)
let response = RPCResponse(id: payload.id, result: wrapper)

let response = RPCResponse(id: payload.id, result: wrapper)

try await networkingInteractor.respond(
topic: payload.topic,
response: response,
protocolMethod: NotifyMessageProtocolMethod()
)

logger.debug("Sent Notify Receipt Response")
}

}.store(in: &publishers)
try await networkingInteractor.respond(
topic: payload.topic,
response: response,
protocolMethod: NotifyMessageProtocolMethod()
)

logger.debug("Sent Notify Receipt Response")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,35 @@ private extension NotifyUpdateResponseSubscriber {
}

func subscribeForUpdateResponse() {
let protocolMethod = NotifyUpdateProtocolMethod()
networkingInteractor.responseSubscription(on: protocolMethod)
.sink {[unowned self] (payload: ResponseSubscriptionPayload<NotifyUpdatePayload.Wrapper, NotifyUpdateResponsePayload.Wrapper>) in
Task(priority: .high) {
logger.debug("Received Notify Update response")
networkingInteractor.subscribeOnResponse(
protocolMethod: NotifyUpdateProtocolMethod(),
requestOfType: NotifyUpdatePayload.Wrapper.self,
responseOfType: NotifyUpdateResponsePayload.Wrapper.self
) { [unowned self] payload in
logger.debug("Received Notify Update response")

let subscriptionTopic = payload.topic
let subscriptionTopic = payload.topic

let (requestPayload, requestClaims) = try NotifyUpdatePayload.decodeAndVerify(from: payload.request)
let (_, _) = try NotifyUpdateResponsePayload.decodeAndVerify(from: payload.response)
let (requestPayload, requestClaims) = try NotifyUpdatePayload.decodeAndVerify(from: payload.request)
let (_, _) = try NotifyUpdateResponsePayload.decodeAndVerify(from: payload.response)

let scope = try await buildScope(selected: requestPayload.scope, dappUrl: requestPayload.dappUrl)
let scope = try await buildScope(selected: requestPayload.scope, dappUrl: requestPayload.dappUrl)

guard let oldSubscription = notifyStorage.getSubscription(topic: subscriptionTopic) else {
logger.debug("NotifyUpdateResponseSubscriber Subscription does not exist")
subscriptionPublisherSubject.send(.failure(Errors.subscriptionDoesNotExist))
return
}
let expiry = Date(timeIntervalSince1970: TimeInterval(requestClaims.exp))
guard let oldSubscription = notifyStorage.getSubscription(topic: subscriptionTopic) else {
logger.debug("NotifyUpdateResponseSubscriber Subscription does not exist")
subscriptionPublisherSubject.send(.failure(Errors.subscriptionDoesNotExist))
return
}
let expiry = Date(timeIntervalSince1970: TimeInterval(requestClaims.exp))

let updatedSubscription = NotifySubscription(topic: subscriptionTopic, account: oldSubscription.account, relay: oldSubscription.relay, metadata: oldSubscription.metadata, scope: scope, expiry: expiry, symKey: oldSubscription.symKey)
let updatedSubscription = NotifySubscription(topic: subscriptionTopic, account: oldSubscription.account, relay: oldSubscription.relay, metadata: oldSubscription.metadata, scope: scope, expiry: expiry, symKey: oldSubscription.symKey)

try await notifyStorage.setSubscription(updatedSubscription)
try await notifyStorage.setSubscription(updatedSubscription)

subscriptionPublisherSubject.send(.success(updatedSubscription))
subscriptionPublisherSubject.send(.success(updatedSubscription))

logger.debug("Updated Subscription")
}
}.store(in: &publishers)
logger.debug("Updated Subscription")
}
}

func buildScope(selected: String, dappUrl: String) async throws -> [String: ScopeValue] {
Expand Down
Loading

0 comments on commit 4d01f54

Please sign in to comment.