Skip to content

Commit

Permalink
Try using a container to avoid modifying the outer dictionary when th…
Browse files Browse the repository at this point in the history
…e inner one changes
  • Loading branch information
djones6 committed Jan 4, 2019
1 parent d12758a commit 9a0d1da
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions Sources/KituraNet/IncomingSocketManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ import Foundation
import LoggerAPI
import Socket

// Holder for a processor thread's socketHandlers dictionary, as a buffer between
// the dictionary itself and the outer one that associates each dictionary with an
// epoll FD. This is an attempt to prevent a crash which may be related to
// dictionary modification on different threads in Swift 5.
fileprivate class SocketHandlerContainer: Equatable, Hashable {

var handlers: [Int32: IncomingSocketHandler]
let id: Int

init(id: Int) {
self.id = id
self.handlers = [Int32: IncomingSocketHandler]()
}

static func == (lhs: SocketHandlerContainer, rhs: SocketHandlerContainer) -> Bool {
return lhs.id == rhs.id
}

func hash(into hasher: inout Hasher) {
hasher.combine(id)
}

}

/**
The IncomingSocketManager class is in charge of managing all of the incoming sockets.
In particular, it is in charge of:
Expand All @@ -34,7 +58,7 @@ In particular, it is in charge of:
b. Adding new incoming sockets to the epoll descriptor for read events
c. Running the "thread" that does the epoll_wait
2. Creating and managing the IncomingSocketHandlers and IncomingHTTPDataProcessors
(one pair per incomng socket)
(one pair per incoming socket)
3. Cleaning up idle sockets, when new incoming sockets arrive.

### Usage Example: ###
Expand All @@ -52,13 +76,13 @@ public class IncomingSocketManager {
/// A number of mappings from socket file descriptor to IncomingSocketHandler.
/// On Linux with epoll, there is one dictionary per epoll thread. Otherwise,
/// there is a single dictionary at index 0.
private var socketHandlers: [Int32: [Int32: IncomingSocketHandler]]
private var socketHandlers: [Int32: SocketHandlerContainer]

/// The sum total of handlers across all epoll threads.
/// Used by SocketManagerTests to check number of registered handlers.
var socketHandlerCount: Int {
return socketHandlers.reduce(0, { i, handlerTuple in
i + handlerTuple.1.count
i + handlerTuple.1.handlers.count
})
}

Expand Down Expand Up @@ -91,14 +115,14 @@ public class IncomingSocketManager {
public init() {
var epollDescriptors = [Int32]()
var queues = [DispatchQueue]()
var socketHandlers = [Int32: [Int32: IncomingSocketHandler]]()
var socketHandlers = [Int32: SocketHandlerContainer]()
for i in 0 ..< numberOfEpollTasks {
// Note: The parameter to epoll_create is ignored on modern Linux's
let epollFd = epoll_create(100)
epollDescriptors.append(epollFd)
queues.append(DispatchQueue(label: "IncomingSocketManager\(i)"))
// socketHandlers is split into a separate dictionary for each epoll thread.
socketHandlers[epollFd] = [Int32: IncomingSocketHandler]()
socketHandlers[epollFd] = SocketHandlerContainer(id: Int(epollFd))
}
self.epollDescriptors = epollDescriptors
self.queues = queues
Expand All @@ -119,8 +143,8 @@ public class IncomingSocketManager {
*/
public init() {
// socketHandlers is not split across threads.
self.socketHandlers = [Int32: [Int32: IncomingSocketHandler]]()
self.socketHandlers[0] = [Int32: IncomingSocketHandler]()
self.socketHandlers = [Int32: SocketHandlerContainer]()
self.socketHandlers[0] = SocketHandlerContainer(id: 0)
}
#endif

Expand Down Expand Up @@ -178,7 +202,7 @@ public class IncomingSocketManager {
try socket.setBlocking(mode: false)

let handler = IncomingSocketHandler(socket: socket, using: processor)
socketHandlers[socketHandlerIndex]?[socket.socketfd] = handler
socketHandlers[socketHandlerIndex]?.handlers[socket.socketfd] = handler

#if !GCD_ASYNCH && os(Linux)
var event = epoll_event()
Expand Down Expand Up @@ -236,7 +260,7 @@ public class IncomingSocketManager {

Log.error("Error occurred on a file descriptor of an epool wait")
} else {
if let handler = socketHandlers[epollDescriptor]?[event.data.fd] {
if let handler = socketHandlers[epollDescriptor]?.handlers[event.data.fd] {

if (event.events & EPOLLOUT.rawValue) != 0 {
handler.handleWrite()
Expand Down Expand Up @@ -303,11 +327,11 @@ public class IncomingSocketManager {
return
}
let maxInterval = now.timeIntervalSinceReferenceDate
socketHandlers[socketHandlerIndex]?.forEach { (fileDescriptor, handler) in
socketHandlers[socketHandlerIndex]?.handlers.forEach { (fileDescriptor, handler) in
if !removeAll && handler.processor != nil && (handler.processor?.inProgress ?? false || maxInterval < handler.processor?.keepAliveUntil ?? maxInterval) {
//continue
} else {
socketHandlers[socketHandlerIndex]?.removeValue(forKey: fileDescriptor)
socketHandlers[socketHandlerIndex]?.handlers.removeValue(forKey: fileDescriptor)

#if !GCD_ASYNCH && os(Linux)
let result = epoll_ctl(epollDescriptor(fd: fileDescriptor), EPOLL_CTL_DEL, fileDescriptor, nil)
Expand Down

0 comments on commit 9a0d1da

Please sign in to comment.