Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds AsyncChannel and AsyncThrowingChannel with tests #2086

Merged
merged 4 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions Amplify/Core/Support/AsyncChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

public actor AsyncChannel<Element: Sendable>: AsyncSequence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need public keyword for all these?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed lets make these internal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, though the AsyncSequence is a protocol with associated type. There is no way to represent it in Swift for a return value without the implementation

public struct Iterator: AsyncIteratorProtocol, Sendable {
private let channel: AsyncChannel<Element>

public init(_ channel: AsyncChannel<Element>) {
self.channel = channel
}

public mutating func next() async -> Element? {
Task.isCancelled ? nil : await channel.next()
}
}

public enum InternalFailure: Error {
case cannotSendAfterTerminated
}
public typealias ChannelContinuation = CheckedContinuation<Element?, Never>

private var continuations: [ChannelContinuation] = []
private var elements: [Element] = []
private var cancelled: Bool = false
private var terminated: Bool = false

private var hasNext: Bool {
!continuations.isEmpty && !elements.isEmpty
}

private var canTerminate: Bool {
terminated && elements.isEmpty && !continuations.isEmpty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be OR cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is correct

}

init() {
}

public nonisolated func makeAsyncIterator() -> Iterator {
Iterator(self)
}

public func next() async -> Element? {
if cancelled || terminated {
return nil
}
return await withCheckedContinuation { (continuation: ChannelContinuation) in
continuations.append(continuation)
processNext()
}
}

public func send(_ element: Element) throws {
if Task.isCancelled {
cancelled = true
processNext()
throw CancellationError()
}
guard !terminated else {
throw InternalFailure.cannotSendAfterTerminated
}
elements.append(element)
processNext()
}

public func finish() {
terminated = true
processNext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we calling processNext after finishing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see the processNext function

}

private func processNext() {
if cancelled && !continuations.isEmpty {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
continuation.resume(returning: nil)
return
}

if canTerminate {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
continuation.resume(returning: nil)
return
}

guard hasNext else {
return
}

assert(!continuations.isEmpty)
assert(!elements.isEmpty)

let continuation = continuations.removeFirst()
let element = elements.removeFirst()

continuation.resume(returning: element)
}
}
126 changes: 126 additions & 0 deletions Amplify/Core/Support/AsyncThrowingChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//

import Foundation

public actor AsyncThrowingChannel<Element: Sendable, Failure: Error>: AsyncSequence {
public struct Iterator: AsyncIteratorProtocol, Sendable {
private let channel: AsyncThrowingChannel<Element, Failure>

public init(_ channel: AsyncThrowingChannel<Element, Failure>) {
self.channel = channel
}

public mutating func next() async throws -> Element? {
try Task.checkCancellation()
return try await channel.next()
}
}

public enum InternalFailure: Error {
case cannotSendAfterTerminated
}
public typealias ChannelContinuation = CheckedContinuation<Element?, Error>

private var continuations: [ChannelContinuation] = []
private var elements: [Element] = []
private var cancelled: Bool = false
private var terminated: Bool = false
private var error: Error? = nil

private var hasNext: Bool {
!continuations.isEmpty && !elements.isEmpty
}

private var canFail: Bool {
error != nil && !continuations.isEmpty
}

private var canTerminate: Bool {
terminated && elements.isEmpty && !continuations.isEmpty
}

init() {
}

public nonisolated func makeAsyncIterator() -> Iterator {
Iterator(self)
}

public func next() async throws -> Element? {
if cancelled {
throw CancellationError()
}
return try await withCheckedThrowingContinuation { (continuation: ChannelContinuation) in
continuations.append(continuation)
processNext()
}
}

public func send(_ element: Element) throws {
if Task.isCancelled {
cancelled = true
processNext()
throw CancellationError()
}
guard !terminated else {
throw InternalFailure.cannotSendAfterTerminated
}
elements.append(element)
processNext()
}

public func fail(_ error: Error) where Failure == Error {
self.error = error
processNext()
}

public func finish() {
terminated = true
processNext()
}

private func processNext() {
if cancelled && !continuations.isEmpty {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
continuation.resume(throwing: CancellationError())
return
}

if canFail {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
assert(error != nil)
if let error = error {
continuation.resume(throwing: error)
return
}
}

if canTerminate {
let continuation = continuations.removeFirst()
assert(continuations.isEmpty)
assert(elements.isEmpty)
continuation.resume(returning: nil)
return
}

guard hasNext else {
return
}

assert(!continuations.isEmpty)
assert(!elements.isEmpty)

let continuation = continuations.removeFirst()
let element = elements.removeFirst()

continuation.resume(returning: element)
}
}
Loading