Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 48 additions & 26 deletions Sources/Subjects/ReplaySubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
let subscriptions: [Subscription<AnySubscriber<Output, Failure>>]

do {
lock.lock()
defer { lock.unlock() }
lock.lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you undo all of the re-formatting you did here please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. But it seems you are using Indent 4 everywhere and if you look into this file in the main branch, you will see that just this do scope has Indent 2. Let me know if that is what you want and I can change it back ;)

defer { lock.unlock() }

guard isActive else { return }
guard isActive else { return }

buffer.append(value)
if buffer.count > bufferSize {
buffer.removeFirst()
}
buffer.append(value)
if buffer.count > bufferSize {
buffer.removeFirst()
}

subscriptions = self.subscriptions
subscriptions = self.subscriptions
}

subscriptions.forEach { $0.forwardValueToBuffer(value) }
Expand Down Expand Up @@ -81,25 +81,30 @@ public final class ReplaySubject<Output, Failure: Error>: Subject {
public func receive<Subscriber: Combine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output == Subscriber.Input {
let subscriberIdentifier = subscriber.combineIdentifier

let subscription = Subscription(downstream: AnySubscriber(subscriber)) { [weak self] in
self?.completeSubscriber(withIdentifier: subscriberIdentifier)
}

let buffer: [Output]
let completion: Subscribers.Completion<Failure>?
let subscription = Subscription(
downstream: AnySubscriber(subscriber),
cancellationHandler: { [weak self] in
self?.completeSubscriber(withIdentifier: subscriberIdentifier)
},
requestReplay: { [weak self] in
let buffer: [Output]?
let completion: Subscribers.Completion<Failure>?

do {
lock.lock()
defer { lock.unlock() }
do {
self?.lock.lock()
defer { self?.lock.unlock() }

subscriptions.append(subscription)
buffer = self?.buffer
completion = self?.completion
}

buffer = self.buffer
completion = self.completion
}
return (buffer, completion)
}
)

subscriptions.append(subscription)
// (*) It was called here.
subscriber.receive(subscription: subscription)
subscription.replay(buffer, completion: completion)
}

private func completeSubscriber(withIdentifier subscriberIdentifier: CombineIdentifier) {
Expand All @@ -115,17 +120,25 @@ extension ReplaySubject {
final class Subscription<Downstream: Subscriber>: Combine.Subscription where Output == Downstream.Input, Failure == Downstream.Failure {
private var demandBuffer: DemandBuffer<Downstream>?
private var cancellationHandler: (() -> Void)?
private var requestReplay: (() -> (buffer: [Output]?, completion: Subscribers.Completion<Failure>?))?

private var isActive = false

fileprivate let innerSubscriberIdentifier: CombineIdentifier

init(downstream: Downstream, cancellationHandler: (() -> Void)?) {
init(
downstream: Downstream,
cancellationHandler: (() -> Void)?,
requestReplay: (() -> ([Output]?, Subscribers.Completion<Failure>?))?
) {
self.demandBuffer = DemandBuffer(subscriber: downstream)
self.innerSubscriberIdentifier = downstream.combineIdentifier
self.cancellationHandler = cancellationHandler
self.requestReplay = requestReplay
}

func replay(_ buffer: [Output], completion: Subscribers.Completion<Failure>?) {
buffer.forEach(forwardValueToBuffer)
func replay(_ buffer: [Output]?, completion: Subscribers.Completion<Failure>?) {
buffer?.forEach(forwardValueToBuffer)

if let completion = completion {
forwardCompletionToBuffer(completion)
Expand All @@ -138,11 +151,20 @@ extension ReplaySubject {

func forwardCompletionToBuffer(_ completion: Subscribers.Completion<Failure>) {
demandBuffer?.complete(completion: completion)
cancel()

if isActive {
cancel()
}
}

func request(_ demand: Subscribers.Demand) {
_ = demandBuffer?.demand(demand)

isActive = true

if let replay = requestReplay?() {
self.replay(replay.buffer, completion: replay.completion)
}
}

func cancel() {
Expand Down
31 changes: 31 additions & 0 deletions Tests/ShareReplayTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,36 @@ final class ShareReplayTests: XCTestCase {
XCTAssertEqual(completions, [.finished])
XCTAssertNil(weakSource)
}

func testSequentialUpstreamWithShareReplay() {
let publisher = Just(1)
.eraseToAnyPublisher()
.share(replay: 1)

var valueReceived = false
var finishedReceived = false

Publishers.Zip(publisher, publisher)
.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
finishedReceived = true
case let .failure(error):
XCTFail("Unexpected completion - failure: \(error).")
}
},
receiveValue: { leftValue, rightValue in
XCTAssertEqual(leftValue, 1)
XCTAssertEqual(rightValue, 1)

valueReceived = true
}
)
.store(in: &subscriptions)

XCTAssertTrue(valueReceived)
XCTAssertTrue(finishedReceived)
}
}
#endif