-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathHTTPResponseCompressor.swift
386 lines (336 loc) · 18.7 KB
/
HTTPResponseCompressor.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import CNIOExtrasZlib
import NIOCore
import NIOHTTP1
extension StringProtocol {
/// Test if this string starts with the same unicode scalars as the given string, `prefix`.
///
/// - note: This will be faster than `String.startsWith` as no unicode normalisations are performed.
///
/// - parameters:
/// - prefix: The string to match at the beginning of `self`
/// - returns: Whether or not `self` starts with the same unicode scalars as `prefix`.
func startsWithExactly<S: StringProtocol>(_ prefix: S) -> Bool {
self.utf8.starts(with: prefix.utf8)
}
}
/// Given a header value, extracts the q value if there is one present. If one is not present,
/// returns the default q value, 1.0.
private func qValueFromHeader<S: StringProtocol>(_ text: S) -> Float {
let headerParts = text.split(separator: ";", maxSplits: 1, omittingEmptySubsequences: false)
guard headerParts.count > 1 && headerParts[1].count > 0 else {
return 1
}
// We have a Q value.
let qValue = Float(headerParts[1].split(separator: "=", maxSplits: 1, omittingEmptySubsequences: false)[1]) ?? 0
if qValue < 0 || qValue > 1 || qValue.isNaN {
return 0
}
return qValue
}
/// A ``HTTPResponseCompressor`` is a duplex channel handler that handles automatic streaming compression of
/// HTTP responses. It respects the client's Accept-Encoding preferences, including q-values if present,
/// and ensures that clients are served the compression algorithm that works best for them.
///
/// This compressor supports gzip and deflate. It works best if many writes are made between flushes.
///
/// Note that this compressor performs the compression on the event loop thread. This means that compressing
/// some resources, particularly those that do not benefit from compression or that could have been compressed
/// ahead-of-time instead of dynamically, could be a waste of CPU time and latency for relatively minimal
/// benefit. This channel handler should be present in the pipeline only for dynamically-generated and
/// highly-compressible content, which will see the biggest benefits from streaming compression.
///
/// The compressor optionally accepts a predicate to help it determine on a per-request basis if compression
/// should be used, even if the client requests it for the request. This could be used to conditionally and statelessly
/// enable compression based on resource types, or by emitting and checking for marker headers as needed.
/// Since the predicate is always called, it can also be used to clean up those marker headers if compression was
/// not actually supported for any reason (ie. the client didn't provide compatible `Accept` headers, or the
/// response was missing a body due to a special status code being used)
public final class HTTPResponseCompressor: ChannelDuplexHandler, RemovableChannelHandler {
/// This class accepts `HTTPServerRequestPart` inbound
public typealias InboundIn = HTTPServerRequestPart
/// This class emits `HTTPServerRequestPart` inbound.
public typealias InboundOut = HTTPServerRequestPart
/// This class accepts `HTTPServerResponsePart` outbound,
public typealias OutboundIn = HTTPServerResponsePart
/// This class emits `HTTPServerResponsePart` outbound.
public typealias OutboundOut = HTTPServerResponsePart
/// A closure that accepts a response header, optionally modifies it, and returns `true` if the response it belongs to should be compressed.
///
/// - Parameter responseHeaders: The headers that will be used for the response. These can be modified as needed at this stage, to clean up any marker headers used to statelessly determine if compression should occur, and the new headers will be used when writing the response. Compression headers are not yet provided and should not be set; ``HTTPResponseCompressor`` will set them accordingly based on the result of this predicate.
/// - Parameter isCompressionSupported: Set to `true` if the client requested compatible compression, and if the HTTP response supports it, otherwise `false`.
/// - Returns: Return ``CompressionIntent/compressIfPossible`` if the compressor should proceed to compress the response, or ``CompressionIntent/doNotCompress`` if the response should not be compressed.
///
/// - Note: Returning ``CompressionIntent/compressIfPossible`` is only a suggestion — when compression is not supported, the response will be returned as is along with any modified headers.
public typealias ResponseCompressionPredicate = (
_ responseHeaders: inout HTTPResponseHead,
_ isCompressionSupported: Bool
) -> CompressionIntent
/// A signal a ``ResponseCompressionPredicate`` returns to indicate if it intends for compression to be used or not when supported by HTTP.
public struct CompressionIntent: Sendable, Hashable {
/// The internal type ``CompressionIntent`` uses.
enum RawValue {
/// The response should be compressed if supported by the HTTP protocol.
case compressIfPossible
/// The response should not be compressed even if supported by the HTTP protocol.
case doNotCompress
}
/// The raw value of the intent.
let rawValue: RawValue
/// Initialize the raw value with an internal intent.
init(_ rawValue: RawValue) {
self.rawValue = rawValue
}
/// The response should be compressed if supported by the HTTP protocol.
public static let compressIfPossible = CompressionIntent(.compressIfPossible)
/// The response should not be compressed even if supported by the HTTP protocol.
public static let doNotCompress = CompressionIntent(.doNotCompress)
}
/// Errors which can occur when compressing
public enum CompressionError: Error {
// Writes were still pending when shutdown.
case uncompressedWritesPending
/// Data was somehow lost without being written.
case noDataToWrite
}
private var compressor: NIOCompression.Compressor
// A queue of accept headers.
private var acceptQueue = CircularBuffer<[Substring]>(initialCapacity: 8)
private var pendingResponse: PartialHTTPResponse!
private var pendingWritePromise: EventLoopPromise<Void>!
private let initialByteBufferCapacity: Int
private let responseCompressionPredicate: ResponseCompressionPredicate?
/// Initialize a ``HTTPResponseCompressor``.
/// - Parameter initialByteBufferCapacity: Initial size of buffer to allocate when hander is first added.
public convenience init(initialByteBufferCapacity: Int = 1024) {
// TODO: This version is kept around for backwards compatibility and should be merged with the signature below in the next major version: https://github.com/apple/swift-nio-extras/issues/226
self.init(initialByteBufferCapacity: initialByteBufferCapacity, responseCompressionPredicate: nil)
}
/// Initialize a ``HTTPResponseCompressor``.
/// - Parameter initialByteBufferCapacity: Initial size of buffer to allocate when hander is first added.
/// - Parameter responseCompressionPredicate: The predicate used to determine if the response should be compressed or not based on its headers. Defaults to `nil`, which will compress every response this handler sees. This predicate is always called whether the client supports compression for this response or not, so it can be used to clean up any marker headers you may use to determine if compression should be performed or not. Please see ``ResponseCompressionPredicate`` for more details.
public init(
initialByteBufferCapacity: Int = 1024,
responseCompressionPredicate: ResponseCompressionPredicate? = nil
) {
self.initialByteBufferCapacity = initialByteBufferCapacity
self.responseCompressionPredicate = responseCompressionPredicate
self.compressor = NIOCompression.Compressor()
}
/// Setup and add to the pipeline.
/// - Parameter context: Calling context.
public func handlerAdded(context: ChannelHandlerContext) {
pendingResponse = PartialHTTPResponse(
bodyBuffer: context.channel.allocator.buffer(capacity: initialByteBufferCapacity)
)
pendingWritePromise = context.eventLoop.makePromise()
}
/// Remove channel handler from the pipeline.
/// - Parameter context: Calling context
public func handlerRemoved(context: ChannelHandlerContext) {
pendingWritePromise?.fail(CompressionError.uncompressedWritesPending)
compressor.shutdownIfActive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if case .head(let requestHead) = unwrapInboundIn(data) {
acceptQueue.append(requestHead.headers[canonicalForm: "accept-encoding"])
}
context.fireChannelRead(data)
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let httpData = unwrapOutboundIn(data)
switch httpData {
case .head(var responseHead):
/// Grab the algorithm to use from the bottom of the accept queue, which will help determine if we support compression for this response or not.
let algorithm = compressionAlgorithm()
let requestSupportsCompression = algorithm != nil && responseHead.status.mayHaveResponseBody
/// If a predicate was set, ask it if we should compress when compression is supported, and give the predicate a chance to clean up any marker headers that may have been set even if compression were not supported.
let predicateCompressionIntent =
responseCompressionPredicate?(&responseHead, requestSupportsCompression) ?? .compressIfPossible
/// Make sure that compression should proceed, otherwise stop here and supply the response headers before configuring the compressor.
guard let algorithm, requestSupportsCompression, predicateCompressionIntent == .compressIfPossible else {
context.write(wrapOutboundOut(.head(responseHead)), promise: promise)
return
}
/// Previous handlers in the pipeline might have already set this header even though they should not have as it is compressor responsibility to decide what encoding to use.
responseHead.headers.replaceOrAdd(name: "Content-Encoding", value: algorithm.description)
/// Initialize the compressor and write the header data, which marks the compressor as "active" allowing the `.body` and `.end` cases to properly compress the response rather than passing it as is.
compressor.initialize(encoding: algorithm)
pendingResponse.bufferResponseHead(responseHead)
pendingWritePromise.futureResult.cascade(to: promise)
case .body(let body):
/// We already determined if compression should occur based on the `.head` case above, so here we simply need to check if the compressor is active or not to determine if we should compress the body chunks or stream them as is.
if compressor.isActive {
pendingResponse.bufferBodyPart(body)
pendingWritePromise.futureResult.cascade(to: promise)
} else {
context.write(data, promise: promise)
}
case .end:
guard compressor.isActive else {
context.write(data, promise: promise)
return
}
/// Compress any trailers and finalize the response. Note that this compression stage is not done in `flush()` because we need to clean up the compressor state to be ready for the next response that can come in on the same handler.
pendingResponse.bufferResponseEnd(httpData)
pendingWritePromise.futureResult.cascade(to: promise)
emitPendingWrites(context: context)
compressor.shutdown()
}
}
public func flush(context: ChannelHandlerContext) {
emitPendingWrites(context: context)
context.flush()
}
/// Determines the compression algorithm to use for the next response.
///
/// Returns the compression algorithm to use, or nil if the next response
/// should not be compressed.
private func compressionAlgorithm() -> NIOCompression.Algorithm? {
let acceptHeaders = acceptQueue.removeFirst()
var gzipQValue: Float = -1
var deflateQValue: Float = -1
var anyQValue: Float = -1
for acceptHeader in acceptHeaders {
if acceptHeader.startsWithExactly("gzip") || acceptHeader.startsWithExactly("x-gzip") {
gzipQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithExactly("deflate") {
deflateQValue = qValueFromHeader(acceptHeader)
} else if acceptHeader.startsWithExactly("*") {
anyQValue = qValueFromHeader(acceptHeader)
}
}
if gzipQValue > 0 || deflateQValue > 0 {
return gzipQValue > deflateQValue ? .gzip : .deflate
} else if anyQValue > 0 {
// Though gzip is usually less well compressed than deflate, it has slightly
// wider support because it's unabiguous. We therefore default to that unless
// the client has expressed a preference.
return .gzip
}
return nil
}
/// Emits all pending buffered writes to the network, optionally compressing the
/// data. Resets the pending write buffer and promise.
///
/// Called either when a HTTP end message is received or our flush() method is called.
private func emitPendingWrites(context: ChannelHandlerContext) {
let writesToEmit = pendingResponse.flush(compressor: &compressor, allocator: context.channel.allocator)
var pendingPromise = pendingWritePromise
if let writeHead = writesToEmit.0 {
context.write(wrapOutboundOut(.head(writeHead)), promise: pendingPromise)
pendingPromise = nil
}
if let writeBody = writesToEmit.1 {
context.write(wrapOutboundOut(.body(.byteBuffer(writeBody))), promise: pendingPromise)
pendingPromise = nil
}
if let writeEnd = writesToEmit.2 {
context.write(wrapOutboundOut(writeEnd), promise: pendingPromise)
pendingPromise = nil
}
// If we still have the pending promise, we never emitted a write. Fail the promise,
// as anything that is listening for its data somehow lost it.
if let stillPendingPromise = pendingPromise {
stillPendingPromise.fail(CompressionError.noDataToWrite)
}
// Reset the pending promise.
pendingWritePromise = context.eventLoop.makePromise()
}
}
@available(*, unavailable)
extension HTTPResponseCompressor: Sendable {}
/// A buffer object that allows us to keep track of how much of a HTTP response we've seen before
/// a flush.
///
/// The strategy used in this module is that we want to have as much information as possible before
/// we compress, and to compress as few times as possible. This is because in the ideal situation we
/// will have a complete HTTP response to compress in one shot, allowing us to update the content
/// length, rather than force the response to be chunked. It is much easier to do the right thing
/// if we can encapsulate our ideas about how HTTP responses in an entity like this.
private struct PartialHTTPResponse {
var head: HTTPResponseHead?
var body: ByteBuffer
var end: HTTPServerResponsePart?
private let initialBufferSize: Int
var isCompleteResponse: Bool {
head != nil && end != nil
}
var mustFlush: Bool {
end != nil
}
init(bodyBuffer: ByteBuffer) {
body = bodyBuffer
initialBufferSize = bodyBuffer.capacity
}
mutating func bufferResponseHead(_ head: HTTPResponseHead) {
precondition(self.head == nil)
self.head = head
}
mutating func bufferBodyPart(_ bodyPart: IOData) {
switch bodyPart {
case .byteBuffer(var buffer):
body.writeBuffer(&buffer)
case .fileRegion:
fatalError("Cannot currently compress file regions")
}
}
mutating func bufferResponseEnd(_ end: HTTPServerResponsePart) {
precondition(self.end == nil)
guard case .end = end else {
fatalError("Buffering wrong entity type: \(end)")
}
self.end = end
}
private mutating func clear() {
head = nil
end = nil
body.clear()
body.reserveCapacity(initialBufferSize)
}
/// Flushes the buffered data into its constituent parts.
///
/// Returns a three-tuple of a HTTP response head, compressed body bytes, and any end that
/// may have been buffered. Each of these types is optional.
///
/// If the head is flushed, it will have had its headers mutated based on whether we had the whole
/// response or not. If nil, the head has previously been emitted.
///
/// If the body is nil, it means no writes were buffered (that is, our buffer of bytes has no
/// readable bytes in it). This should usually mean that no write is issued.
///
/// Calling this function resets the buffer, freeing any excess memory allocated in the internal
/// buffer and losing all copies of the other HTTP data. At this point it may freely be reused.
mutating func flush(
compressor: inout NIOCompression.Compressor,
allocator: ByteBufferAllocator
) -> (HTTPResponseHead?, ByteBuffer?, HTTPServerResponsePart?) {
var outputBody: ByteBuffer? = nil
if self.body.readableBytes > 0 || mustFlush {
let compressedBody = compressor.compress(inputBuffer: &self.body, allocator: allocator, finalise: mustFlush)
if isCompleteResponse {
head!.headers.remove(name: "transfer-encoding")
head!.headers.replaceOrAdd(name: "content-length", value: "\(compressedBody.readableBytes)")
} else if head != nil && head!.status.mayHaveResponseBody {
head!.headers.remove(name: "content-length")
head!.headers.replaceOrAdd(name: "transfer-encoding", value: "chunked")
}
outputBody = compressedBody
}
let response = (head, outputBody, end)
clear()
return response
}
}