Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds AsyncChannel and AsyncThrowingChannel with tests #2086

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions Amplify/Core/Support/AsyncChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

actor AsyncChannel<Element: Sendable>: AsyncSequence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Call it AmplifyAsyncChannel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is reason to specialize this name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if developer uses both Amplify and AsyncAlgorithms this names can conflict. We cannot use module name to distinguish between them because of the known issue in SPM where it cannot figure out the type Amplify vs the module Amplify. swiftlang/swift#43510

struct Iterator: AsyncIteratorProtocol, Sendable {
private let channel: AsyncChannel<Element>

init(_ channel: AsyncChannel<Element>) {
self.channel = channel
}

mutating func next() async -> Element? {
await channel.next()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can be canceled here?

}
}

enum InternalFailure: Error {
case cannotSendAfterTerminated
}
typealias ChannelContinuation = CheckedContinuation<Element?, Never>

private var continuations: [ChannelContinuation] = []
private var elements: [Element] = []
private var terminated: Bool = false

private var hasNext: Bool {
!continuations.isEmpty && !elements.isEmpty
}

private var canTerminate: Bool {
terminated && elements.isEmpty && !continuations.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be OR cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is correct

}

init() {
}

nonisolated func makeAsyncIterator() -> Iterator {
Copy link
Contributor

@royjit royjit Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnonisolated okay? Will the channel work if we iterate in two different task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function cannot be isolated in the actor. it also does not mutate any state. It creates the iterator which is used by the sequence and all those functions are isolated within the actor.

Iterator(self)
}

func next() async -> Element? {
await withCheckedContinuation { (continuation: ChannelContinuation) in
continuations.append(continuation)
processNext()
}
}

func send(_ element: Element) throws {
guard !terminated else {
throw InternalFailure.cannotSendAfterTerminated
}
elements.append(element)
processNext()
}

func finish() {
terminated = true
processNext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we calling processNext after finishing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the processNext function

}

private func processNext() {
if canTerminate {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
continuation.resume(returning: nil)
return
}

guard hasNext else {
return
}

assert(!continuations.isEmpty)
assert(!elements.isEmpty)

let continuation = continuations.removeFirst()
let element = elements.removeFirst()

continuation.resume(returning: element)
}
}
110 changes: 110 additions & 0 deletions Amplify/Core/Support/AsyncThrowingChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

actor AsyncThrowingChannel<Element: Sendable, Failure: Error>: AsyncSequence {
struct Iterator: AsyncIteratorProtocol, Sendable {
private let channel: AsyncThrowingChannel<Element, Failure>

init(_ channel: AsyncThrowingChannel<Element, Failure>) {
self.channel = channel
}

mutating func next() async throws -> Element? {
try await channel.next()
}
}

enum InternalFailure: Error {
case cannotSendAfterTerminated
}
typealias ChannelContinuation = CheckedContinuation<Element?, Error>

private var continuations: [ChannelContinuation] = []
private var elements: [Element] = []
private var terminated: Bool = false
private var error: Error? = nil

private var hasNext: Bool {
!continuations.isEmpty && !elements.isEmpty
}

private var canFail: Bool {
error != nil && !continuations.isEmpty
}

private var canTerminate: Bool {
terminated && elements.isEmpty && !continuations.isEmpty
}

init() {
}

nonisolated func makeAsyncIterator() -> Iterator {
Iterator(self)
}

func next() async throws -> Element? {
try await withCheckedThrowingContinuation { (continuation: ChannelContinuation) in
continuations.append(continuation)
processNext()
}
}

func send(_ element: Element) throws {
guard !terminated else {
throw InternalFailure.cannotSendAfterTerminated
}
elements.append(element)
processNext()
}


func fail(_ error: Error) where Failure == Error {
self.error = error
processNext()
}

func finish() {
terminated = true
processNext()
}

private func processNext() {
if canFail {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
assert(error != nil)
if let error = error {
continuation.resume(throwing: error)
return
}
}

if canTerminate {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
continuation.resume(returning: nil)
return
}

guard hasNext else {
return
}

assert(!continuations.isEmpty)
assert(!elements.isEmpty)

let continuation = continuations.removeFirst()
let element = elements.removeFirst()

continuation.resume(returning: element)
}
}
Loading