Skip to content

Commit b17e704

Browse files
authored
fix: Rely on on task completion vs http completion to end tasks (#134)
* fix: Rely on on task completion vs http completion to end tasks * Add tests
1 parent 5d214c4 commit b17e704

File tree

3 files changed

+136
-69
lines changed

3 files changed

+136
-69
lines changed

Sources/Amplitude/Utilities/EventPipeline.swift

+21-38
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class EventPipeline {
1919
let events: String
2020
let task: URLSessionDataTask
2121
}
22-
private var uploads = [UploadTaskInfo]()
22+
private var uploads = [URL: UploadTaskInfo]()
2323

2424
init(amplitude: Amplitude) {
2525
self.amplitude = amplitude
@@ -54,32 +54,36 @@ public class EventPipeline {
5454
eventCount = 0
5555
guard let storage = self.storage else { return }
5656
storage.rollover()
57-
guard let eventFiles: [URL]? = storage.read(key: StorageKey.EVENTS) else { return }
58-
cleanupUploads()
59-
if pendingUploads == 0 {
60-
for eventFile in eventFiles! {
61-
guard let eventsString = storage.getEventsString(eventBlock: eventFile) else {
62-
continue
57+
guard let eventFiles: [URL] = storage.read(key: StorageKey.EVENTS) else { return }
58+
for eventFile in eventFiles {
59+
uploadsQueue.sync {
60+
guard uploads[eventFile] == nil else {
61+
amplitude.logger?.log(message: "Existing upload in progress, skipping...")
62+
return
6363
}
64-
if eventsString.isEmpty {
65-
continue
64+
guard let eventsString = storage.getEventsString(eventBlock: eventFile),
65+
!eventsString.isEmpty else {
66+
return
6667
}
6768
let uploadTask = httpClient.upload(events: eventsString) { [weak self] result in
69+
guard let self else {
70+
return
71+
}
6872
let responseHandler = storage.getResponseHandler(
69-
configuration: self!.amplitude.configuration,
70-
eventPipeline: self!,
73+
configuration: self.amplitude.configuration,
74+
eventPipeline: self,
7175
eventBlock: eventFile,
7276
eventsString: eventsString
7377
)
7478
responseHandler.handle(result: result)
75-
self?.cleanupUploads()
79+
self.completeUpload(for: eventFile)
7680
}
77-
if let upload = uploadTask {
78-
add(uploadTask: UploadTaskInfo(events: eventsString, task: upload))
81+
if let uploadTask {
82+
uploads[eventFile] = UploadTaskInfo(events: eventsString, task: uploadTask)
7983
}
8084
}
81-
completion?()
8285
}
86+
completion?()
8387
}
8488

8589
func start() {
@@ -101,31 +105,10 @@ public class EventPipeline {
101105
}
102106

103107
extension EventPipeline {
104-
internal func cleanupUploads() {
105-
uploadsQueue.sync {
106-
let before = uploads.count
107-
var newPending = uploads
108-
newPending.removeAll { uploadInfo in
109-
let shouldRemove = uploadInfo.task.state != .running
110-
return shouldRemove
111-
}
112-
uploads = newPending
113-
let after = uploads.count
114-
amplitude.logger?.log(message: "Cleaned up \(before - after) non-running uploads.")
115-
}
116-
}
117-
118-
internal var pendingUploads: Int {
119-
var uploadsCount = 0
120-
uploadsQueue.sync {
121-
uploadsCount = uploads.count
122-
}
123-
return uploadsCount
124-
}
125108

126-
internal func add(uploadTask: UploadTaskInfo) {
109+
func completeUpload(for eventBlock: URL) {
127110
uploadsQueue.sync {
128-
uploads.append(uploadTask)
111+
uploads[eventBlock] = nil
129112
}
130113
}
131114
}

Tests/AmplitudeTests/Supports/TestUtilities.swift

+52-30
Original file line numberDiff line numberDiff line change
@@ -47,62 +47,75 @@ class FakeInMemoryStorage: Storage {
4747
var keyValueStore = [String: Any?]()
4848
var eventsStore = [URL: [BaseEvent]]()
4949
var index = URL(string: "0")!
50+
let storageQueue = DispatchQueue(label: "Amplitude.FakeInMemoryStorage")
5051

5152
func write(key: StorageKey, value: Any?) throws {
52-
switch key {
53-
case .EVENTS:
54-
if let event = value as? BaseEvent {
55-
var chunk = eventsStore[index, default: [BaseEvent]()]
56-
chunk.append(event)
57-
eventsStore[index] = chunk
53+
storageQueue.sync {
54+
switch key {
55+
case .EVENTS:
56+
if let event = value as? BaseEvent {
57+
var chunk = eventsStore[index, default: [BaseEvent]()]
58+
chunk.append(event)
59+
eventsStore[index] = chunk
60+
}
61+
default:
62+
keyValueStore[key.rawValue] = value
5863
}
59-
default:
60-
keyValueStore[key.rawValue] = value
6164
}
6265
}
6366

6467
func read<T>(key: StorageKey) -> T? {
65-
var result: T?
66-
switch key {
67-
case .EVENTS:
68-
result = Array(eventsStore.keys) as? T
69-
default:
70-
result = keyValueStore[key.rawValue] as? T
68+
storageQueue.sync {
69+
var result: T?
70+
switch key {
71+
case .EVENTS:
72+
result = Array(eventsStore.keys) as? T
73+
default:
74+
result = keyValueStore[key.rawValue] as? T
75+
}
76+
return result
7177
}
72-
return result
7378
}
7479

7580
func getEventsString(eventBlock: EventBlock) -> String? {
76-
var content: String?
77-
content = "["
78-
content = content! + (eventsStore[eventBlock] ?? []).map { $0.toString() }.joined(separator: ", ")
79-
content = content! + "]"
80-
return content
81+
storageQueue.sync {
82+
var content: String?
83+
content = "["
84+
content = content! + (eventsStore[eventBlock] ?? []).map { $0.toString() }.joined(separator: ", ")
85+
content = content! + "]"
86+
return content
87+
}
8188
}
8289

8390
func rollover() {
8491
}
8592

8693
func reset() {
87-
keyValueStore.removeAll()
88-
eventsStore.removeAll()
94+
storageQueue.sync {
95+
keyValueStore.removeAll()
96+
eventsStore.removeAll()
97+
}
8998
}
9099

91100
func remove(eventBlock: EventBlock) {
92-
eventsStore.removeValue(forKey: eventBlock)
101+
storageQueue.sync {
102+
_ = eventsStore.removeValue(forKey: eventBlock)
103+
}
93104
}
94105

95106
func splitBlock(eventBlock: EventBlock, events: [BaseEvent]) {
96107
}
97108

98109
func events() -> [BaseEvent] {
99-
var result: [BaseEvent] = []
100-
for (_, value) in eventsStore {
101-
for event in value {
102-
result.append(event)
110+
storageQueue.sync {
111+
var result: [BaseEvent] = []
112+
for (_, value) in eventsStore {
113+
for event in value {
114+
result.append(event)
115+
}
103116
}
117+
return result
104118
}
105-
return result
106119
}
107120

108121
nonisolated func getResponseHandler(
@@ -121,6 +134,7 @@ class FakeHttpClient: HttpClient {
121134
var uploadCount: Int = 0
122135
var uploadedEvents: [String] = []
123136
var uploadExpectations: [XCTestExpectation] = []
137+
var uploadResults: [Result<Int, Error>] = []
124138

125139
override func upload(events: String, completion: @escaping (_ result: Result<Int, Error>) -> Void)
126140
-> URLSessionDataTask?
@@ -130,8 +144,16 @@ class FakeHttpClient: HttpClient {
130144
if !uploadExpectations.isEmpty {
131145
uploadExpectations.removeFirst().fulfill()
132146
}
133-
completion(Result.success(200))
134-
return nil
147+
let result: Result<Int, Error>
148+
if uploadResults.isEmpty {
149+
result = .success(200)
150+
} else {
151+
result = uploadResults.removeFirst()
152+
}
153+
DispatchQueue.global().async {
154+
completion(result)
155+
}
156+
return URLSession.shared.dataTask(with: URLRequest(url: URL(string: "https://www.amplitude.com")!))
135157
}
136158

137159
override func getDate() -> Date {

Tests/AmplitudeTests/Utilities/EventPipelineTests.swift

+63-1
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ final class EventPipelineTests: XCTestCase {
1515
private var configuration: Configuration!
1616
private var pipeline: EventPipeline!
1717
private var httpClient: FakeHttpClient!
18+
private var storage: PersistentStorage!
1819

1920
override func setUp() {
2021
super.setUp()
21-
let storage = FakeInMemoryStorage()
22+
storage = PersistentStorage(
23+
storagePrefix: "event-pipeline-tests",
24+
logger: nil,
25+
diagonostics: Diagnostics())
2226
configuration = Configuration(
2327
apiKey: "testApiKey",
2428
flushIntervalMillis: Int(Self.FLUSH_INTERVAL_SECONDS * 1000),
@@ -30,6 +34,11 @@ final class EventPipelineTests: XCTestCase {
3034
pipeline.httpClient = httpClient
3135
}
3236

37+
override func tearDown() {
38+
super.tearDown()
39+
storage.reset()
40+
}
41+
3342
func testInit() {
3443
XCTAssertEqual(pipeline.amplitude.configuration.apiKey, configuration.apiKey)
3544
}
@@ -77,4 +86,57 @@ final class EventPipelineTests: XCTestCase {
7786
XCTAssertEqual(pipeline.amplitude.configuration.offline, true)
7887
XCTAssertEqual(httpClient.uploadCount, 0, "There should be no uploads when offline")
7988
}
89+
90+
func testSimultaneousFlush() {
91+
let testEvent = BaseEvent(userId: "unit-test", deviceId: "unit-test-machine", eventType: "testEvent")
92+
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)
93+
94+
let flushExpectations = (0..<2).map { _ in
95+
let expectation = expectation(description: "flush")
96+
pipeline.flush {
97+
expectation.fulfill()
98+
}
99+
return expectation
100+
}
101+
102+
let waitResult = XCTWaiter.wait(for: flushExpectations, timeout: 1)
103+
XCTAssertNotEqual(waitResult, .timedOut)
104+
XCTAssertEqual(httpClient.uploadCount, 1)
105+
let uploadedEvents = BaseEvent.fromArrayString(jsonString: httpClient.uploadedEvents[0])
106+
XCTAssertEqual(uploadedEvents?.count, 1)
107+
XCTAssertEqual(uploadedEvents![0].eventType, "testEvent")
108+
}
109+
110+
func testInvalidEventUpload() {
111+
(0..<2).forEach { i in
112+
let testEvent = BaseEvent(userId: "test", deviceId: "test-machine", eventType: "testEvent-\(i)")
113+
try? pipeline.storage?.write(key: StorageKey.EVENTS, value: testEvent)
114+
}
115+
116+
let invalidResponseData = "{\"events_with_invalid_fields\": {\"user_id\": [0]}}".data(using: .utf8)!
117+
118+
httpClient.uploadResults = [
119+
.failure(HttpClient.Exception.httpError(code: 400, data: invalidResponseData))
120+
]
121+
122+
let uploadExpectations = (0..<2).map { _ in expectation(description: "httpresponse") }
123+
httpClient.uploadExpectations = uploadExpectations
124+
125+
pipeline.flush()
126+
wait(for: [uploadExpectations[0]], timeout: 1)
127+
128+
pipeline.flush()
129+
wait(for: [uploadExpectations[1]], timeout: 1)
130+
131+
XCTAssertEqual(httpClient.uploadCount, 2)
132+
133+
let uploadedEvents0 = BaseEvent.fromArrayString(jsonString: httpClient.uploadedEvents[0])
134+
XCTAssertEqual(uploadedEvents0?.count, 2)
135+
XCTAssertEqual(uploadedEvents0?[0].eventType, "testEvent-0")
136+
XCTAssertEqual(uploadedEvents0?[1].eventType, "testEvent-1")
137+
138+
let uploadedEvents1 = BaseEvent.fromArrayString(jsonString: httpClient.uploadedEvents[1])
139+
XCTAssertEqual(uploadedEvents1?.count, 1)
140+
XCTAssertEqual(uploadedEvents1?[0].eventType, "testEvent-1")
141+
}
80142
}

0 commit comments

Comments
 (0)