Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scenario 10 for swift-combine. #374

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions swift-combine/Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-atomics.git",
"state" : {
"revision" : "ff3d2212b6b093db7f177d0855adbc4ef9c5f036",
"version" : "1.0.3"
"revision" : "cd142fd2f64be2100422d658e7411e39489da985",
"version" : "1.2.0"
}
},
{
Expand Down
7 changes: 5 additions & 2 deletions swift-combine/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/alexsteinerde/docker-client-swift.git", from: "0.1.2"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.2.0"),
],
targets: [
.executableTarget(
name: "EasyRacer",
dependencies: []),
dependencies: [
.product(name: "Atomics", package: "swift-atomics"),
]),
.testTarget(
name: "EasyRacerTests",
dependencies: [
"EasyRacer",
.product(name: "DockerClientSwift", package: "docker-client-swift")
.product(name: "DockerClientSwift", package: "docker-client-swift"),
]),
]
)
155 changes: 154 additions & 1 deletion swift-combine/Sources/EasyRacer/EasyRacer.swift
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Atomics
import Combine
import Foundation

Expand All @@ -14,14 +15,62 @@ extension URLSession {
guard
let text: String = String(data: data, encoding: .utf8)
else {
throw URLError(.cannotDecodeContentData)
throw URLError(.cannotDecodeRawData)
}

return text
}
}
}

struct Repeating<Output>: Publisher {
typealias Failure = Never

private let element: Output

init(_ element: Output) {
self.element = element
}

func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, Output == S.Input {
let subscription: some Subscription = RepeatingSubscription(element, subscriber)
subscriber.receive(subscription: subscription)
}

final class RepeatingSubscription<Downstream: Subscriber>: Subscription where Downstream.Input == Output, Downstream.Failure == Never {
private let element: Output
private let subscriber: Downstream
private var active: ManagedAtomic<Bool> = ManagedAtomic(true)

init(_ element: Output, _ subscriber: Downstream) {
self.element = element
self.subscriber = subscriber
}

func request(_ demand: Subscribers.Demand) {
if active.load(ordering: .relaxed) {
Task {
if let count: Int = demand.max {
let nextDemand: Subscribers.Demand? = (0..<count)
.map { _ in subscriber.receive(element) }
.last
if let nextDemand: Subscribers.Demand = nextDemand {
request(nextDemand)
}
} else {
let nextDemand: Subscribers.Demand = subscriber.receive(element)
request(nextDemand.max ?? 0 > 0 ? nextDemand : demand)
}
}
}
}

func cancel() {
active.store(false, ordering: .relaxed)
}
}
}

@main
public struct EasyRacer {
let baseURL: URL
Expand Down Expand Up @@ -204,6 +253,109 @@ public struct EasyRacer {
.eraseToAnyPublisher()
}

func scenario10() -> AnyPublisher<String, Never> {
let url: URL = baseURL.appendingPathComponent("10")
let urlSession: URLSession = URLSession(configuration: .ephemeral)
let id: String = UUID().uuidString

guard
let urlComps: URLComponents = URLComponents(
url: url, resolvingAgainstBaseURL: false
)
else {
return Empty().eraseToAnyPublisher()
}

var blockerURLComps = urlComps
blockerURLComps.queryItems = [URLQueryItem(name: id, value: nil)]
guard
let blockerURL: URL = blockerURLComps.url
else {
return Empty().eraseToAnyPublisher()
}
let blocker: some Publisher<String?, Never> = urlSession
.bodyTextTaskPublisher(for: blockerURL)
.map { $0 }.replaceError(with: nil)

// Busy-wait by publishing nils as quickly as possible
let blocking: some Publisher<String?, Never> = Repeating(nil)

func currentWallTime() -> TimeInterval {
var timeval: timeval = timeval()
// Should never error as parameters are valid
gettimeofday(&timeval, nil)

return TimeInterval(timeval.tv_sec) + TimeInterval(timeval.tv_usec) / 1_000_000.0
}
func currentCPUTime() -> TimeInterval {
var rusage: rusage = rusage()
// Should never error as parameters are valid
getrusage(RUSAGE_SELF, &rusage)
let utime = rusage.ru_utime
let stime = rusage.ru_stime
let secs = utime.tv_sec + stime.tv_sec
let usecs = utime.tv_usec + stime.tv_usec

return TimeInterval(secs) + TimeInterval(usecs) / 1_000_000.0
}
func reportProcessLoad(
startWallTime: TimeInterval, startCPUTime: TimeInterval
) -> AnyPublisher<String?, URLError> {
let endWallTime: TimeInterval = currentWallTime()
let endCPUTime: TimeInterval = currentCPUTime()
let totalUsageOfCPU: Double = (endCPUTime - startCPUTime) / (endWallTime - startWallTime)

var reporterURLComps = urlComps
reporterURLComps.queryItems = [URLQueryItem(name: id, value: "\(totalUsageOfCPU)")]
guard
let reporterURL: URL = reporterURLComps.url
else {
return Fail(error: URLError(.badURL)).eraseToAnyPublisher()
}

return urlSession
.dataTaskPublisher(for: reporterURL)
.flatMap { data, response -> AnyPublisher<String?, URLError> in
guard
let response: HTTPURLResponse = response as? HTTPURLResponse,
200..<400 ~= response.statusCode
else {
return Fail(error: URLError(.badServerResponse))
.eraseToAnyPublisher()
}

if 300..<400 ~= response.statusCode {
return Just(())
.delay(for: .seconds(1), scheduler: DispatchQueue.global(qos: .background))
.flatMap { _ in
reportProcessLoad(
startWallTime: endWallTime, startCPUTime: endCPUTime
)
}
.eraseToAnyPublisher()
}

guard
let text: String = String(data: data, encoding: .utf8)
else {
return Fail(error: URLError(.cannotDecodeRawData))
.eraseToAnyPublisher()
}

return Just(text).setFailureType(to: URLError.self).eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
let reporter: some Publisher<String?, Never> = reportProcessLoad(
startWallTime: currentWallTime(), startCPUTime: currentCPUTime()
).replaceError(with: nil)

return blocker.merge(with: blocking).first { $0 != nil }
.merge(with: reporter)
.compactMap { $0 }.first { $0 != "" }
.eraseToAnyPublisher()
}

public func scenarios() -> AnyPublisher<[String?], Never> {
let scenarios = [
(1, scenario1()),
Expand All @@ -214,6 +366,7 @@ public struct EasyRacer {
(7, scenario7()),
(8, scenario8()),
(9, scenario9()),
(10, scenario10()),
(3, scenario3()), // This has to come last, as it frequently causes other scenarios to fail
]
return scenarios.publisher
Expand Down
Loading