From 7ef325568eb54fa32473abf0bf40d17e64745b4c Mon Sep 17 00:00:00 2001 From: Danny Canter Date: Mon, 30 Mar 2026 19:31:14 -0700 Subject: [PATCH] Epoll: Rework epoll type I spent a little trying to fix all of the various issues with the epoll wrapper we had, before realizing most of them would be completely gone if we just reworked the type itself :). The callback nature, all of the handler/state tracking internally all (to me) has no purpose being in the type itself. All of this logic can live outside and the wrapper should just be a typesafe abstraction around it that you can build on. This is what this change aims to do. There should be zero behavioral difference here. --- Sources/ContainerizationOS/Linux/Epoll.swift | 220 +++++++++--------- vminitd/Sources/vminitd/IOPair.swift | 4 +- .../Sources/vminitd/ProcessSupervisor.swift | 40 +++- vminitd/Sources/vminitd/VsockProxy.swift | 12 +- 4 files changed, 160 insertions(+), 116 deletions(-) diff --git a/Sources/ContainerizationOS/Linux/Epoll.swift b/Sources/ContainerizationOS/Linux/Epoll.swift index e3249e38..31fac556 100644 --- a/Sources/ContainerizationOS/Linux/Epoll.swift +++ b/Sources/ContainerizationOS/Linux/Epoll.swift @@ -14,68 +14,136 @@ // limitations under the License. //===----------------------------------------------------------------------===// +#if os(Linux) +import Foundation + #if canImport(Musl) import Musl +#elseif canImport(Glibc) +import Glibc +#endif -import Foundation -import Synchronization - -/// Register file descriptors to receive events via Linux's -/// epoll syscall surface. +/// A thin wrapper around the Linux epoll syscall surface. public final class Epoll: Sendable { - public typealias Mask = Int32 - public typealias Handler = (@Sendable (Mask) -> Void) + /// A set of epoll event flags. + public struct Mask: OptionSet, Sendable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let input = Mask(rawValue: UInt32(bitPattern: EPOLLIN)) + public static let output = Mask(rawValue: UInt32(bitPattern: EPOLLOUT)) + + public var isHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: UInt32(bitPattern: EPOLLHUP | EPOLLERR))) + } + + public var isRemoteHangup: Bool { + !self.isDisjoint(with: Mask(rawValue: UInt32(bitPattern: EPOLLRDHUP))) + } + + public var readyToRead: Bool { + self.contains(.input) + } + + public var readyToWrite: Bool { + self.contains(.output) + } + } + + /// An event returned by `wait()`. + public struct Event: Sendable { + public let fd: Int32 + public let mask: Mask + } private let epollFD: Int32 - private let handlers = SafeMap() - private let pipe = Pipe() // to wake up a waiting epoll_wait + private let shutdownReadFD: Int32 + private let shutdownWriteFD: Int32 public init() throws { let efd = epoll_create1(EPOLL_CLOEXEC) - guard efd > 0 else { + guard efd >= 0 else { throw POSIXError.fromErrno() } + + var pipeFDs: (Int32, Int32) = (0, 0) + let pipeResult = withUnsafeMutablePointer(to: &pipeFDs) { ptr in + ptr.withMemoryRebound(to: Int32.self, capacity: 2) { buf in + pipe2(buf, O_CLOEXEC | O_NONBLOCK) + } + } + guard pipeResult == 0 else { + let pipeErrno = POSIXError.fromErrno() + close(efd) + throw pipeErrno + } + self.epollFD = efd - try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in } + self.shutdownReadFD = pipeFDs.0 + self.shutdownWriteFD = pipeFDs.1 + + // Register the shutdown pipe read end with epoll. + var event = epoll_event() + event.events = UInt32(bitPattern: EPOLLIN) + event.data.fd = self.shutdownReadFD + let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(efd, EPOLL_CTL_ADD, self.shutdownReadFD, ptr) + } + guard ctlResult == 0 else { + let ctlErrno = POSIXError.fromErrno() + close(self.shutdownReadFD) + close(self.shutdownWriteFD) + close(efd) + throw ctlErrno + } } - public func add( - _ fd: Int32, - mask: Int32 = EPOLLIN | EPOLLOUT, // HUP is always added - handler: @escaping Handler - ) throws { + deinit { + close(epollFD) + close(shutdownReadFD) + close(shutdownWriteFD) + } + + /// Register a file descriptor for edge-triggered monitoring. + public func add(_ fd: Int32, mask: Mask) throws { guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else { throw POSIXError.fromErrno() } - let events = EPOLLET | UInt32(bitPattern: mask) + let events = EPOLLET | mask.rawValue var event = epoll_event() event.events = events event.data.fd = fd try withUnsafeMutablePointer(to: &event) { ptr in - while true { - if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { - if errno == EAGAIN || errno == EINTR { - continue - } - throw POSIXError.fromErrno() - } - break + if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 { + throw POSIXError.fromErrno() } } + } - self.handlers.set(fd, handler) + /// Remove a file descriptor from the monitored collection. + public func delete(_ fd: Int32) throws { + var event = epoll_event() + let result = withUnsafeMutablePointer(to: &event) { ptr in + epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr) + } + if result != 0 { + if !acceptableDeletionErrno() { + throw POSIXError.fromErrno() + } + } } - /// Run the main epoll loop. + /// Wait for events. /// - /// max events to return in a single wait - /// timeout in ms. - /// -1 means block forever. - /// 0 means return immediately if no events. - public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws { + /// Returns an array of events ready for processing. Returns an empty array + /// on shutdown or timeout. + public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event] { var events: [epoll_event] = .init( repeating: epoll_event(), count: maxEvents @@ -83,46 +151,34 @@ public final class Epoll: Sendable { while true { let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout) - guard n >= 0 else { + if n < 0 { if errno == EINTR || errno == EAGAIN { - continue // go back to epoll_wait + continue } - throw POSIXError.fromErrno() + return [] } if n == 0 { - return // if epoll wait times out, then n will be 0 + return [] } + var result: [Event] = [] + result.reserveCapacity(Int(n)) for i in 0.. Bool { errno == ENOENT || errno == EBADF || errno == EPERM } - - /// Shutdown the epoll handler. - public func shutdown() throws { - // wakes up epoll_wait and triggers a shutdown - try self.pipe.fileHandleForWriting.close() - } - - private final class SafeMap: Sendable { - let dict = Mutex<[Key: Value]>([:]) - - func set(_ key: Key, _ value: Value) { - dict.withLock { @Sendable in - $0[key] = value - } - } - - func get(_ key: Key) -> Value? { - dict.withLock { @Sendable in - $0[key] - } - } - - func del(_ key: Key) { - dict.withLock { @Sendable in - _ = $0.removeValue(forKey: key) - } - } - } -} - -extension Epoll.Mask { - public var isHangup: Bool { - (self & (EPOLLHUP | EPOLLERR)) != 0 - } - - public var isRhangup: Bool { - (self & EPOLLRDHUP) != 0 - } - - public var readyToRead: Bool { - (self & EPOLLIN) != 0 - } - - public var readyToWrite: Bool { - (self & EPOLLOUT) != 0 - } } -#endif // canImport(Musl) +#endif // os(Linux) diff --git a/vminitd/Sources/vminitd/IOPair.swift b/vminitd/Sources/vminitd/IOPair.swift index f8b06623..7243e16f 100644 --- a/vminitd/Sources/vminitd/IOPair.swift +++ b/vminitd/Sources/vminitd/IOPair.swift @@ -69,7 +69,7 @@ final class IOPair: Sendable { // Remove the fd from our global epoll instance first. let readFromFd = self.from.fileDescriptor do { - try ProcessSupervisor.default.poller.delete(readFromFd) + try ProcessSupervisor.default.unregisterFd(readFromFd) } catch { logger?.error("failed to delete fd from epoll \(readFromFd): \(error)") } @@ -118,7 +118,7 @@ final class IOPair: Sendable { let readFrom = OSFile(fd: readFromFd) let writeTo = OSFile(fd: writeToFd) - try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in + try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in self.io.withLock { io in if io.closed { return diff --git a/vminitd/Sources/vminitd/ProcessSupervisor.swift b/vminitd/Sources/vminitd/ProcessSupervisor.swift index 6bfe3e9d..f5fafdc1 100644 --- a/vminitd/Sources/vminitd/ProcessSupervisor.swift +++ b/vminitd/Sources/vminitd/ProcessSupervisor.swift @@ -20,7 +20,8 @@ import Logging import Synchronization final class ProcessSupervisor: Sendable { - let poller: Epoll + private let poller: Epoll + private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:]) private let queue: DispatchQueue // `DispatchSourceSignal` is thread-safe. @@ -47,11 +48,44 @@ final class ProcessSupervisor: Sendable { self.poller = try! Epoll() self.state = Mutex(State()) let t = Thread { - try! self.poller.run() + while true { + let events = self.poller.wait() + if events.isEmpty { + break + } + for event in events { + let handler = self.handlers.withLock { $0[event.fd] } + handler?(event.mask) + } + } } t.start() } + /// Register a file descriptor for epoll monitoring with a handler. + /// + /// The handler is stored before the fd is added to epoll, ensuring no + /// events are missed. + func registerFd( + _ fd: Int32, + mask: Epoll.Mask = [.input, .output], + handler: @escaping @Sendable (Epoll.Mask) -> Void + ) throws { + self.handlers.withLock { $0[fd] = handler } + do { + try self.poller.add(fd, mask: mask) + } catch { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + throw error + } + } + + /// Remove a file descriptor from epoll monitoring and discard its handler. + func unregisterFd(_ fd: Int32) throws { + self.handlers.withLock { _ = $0.removeValue(forKey: fd) } + try self.poller.delete(fd) + } + func ready() { self.source.setEventHandler { self.handleSignal() @@ -123,6 +157,6 @@ final class ProcessSupervisor: Sendable { deinit { source.cancel() - try? poller.shutdown() + poller.shutdown() } } diff --git a/vminitd/Sources/vminitd/VsockProxy.swift b/vminitd/Sources/vminitd/VsockProxy.swift index 9e92d9fa..a0a1a5a6 100644 --- a/vminitd/Sources/vminitd/VsockProxy.swift +++ b/vminitd/Sources/vminitd/VsockProxy.swift @@ -233,8 +233,8 @@ extension VsockProxy { ) do { - try ProcessSupervisor.default.poller.delete(clientFile.fileDescriptor) - try ProcessSupervisor.default.poller.delete(serverFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(clientFile.fileDescriptor) + try ProcessSupervisor.default.unregisterFd(serverFile.fileDescriptor) try conn.close() try relayTo.close() } catch { @@ -243,7 +243,7 @@ extension VsockProxy { c.resume() } - try! ProcessSupervisor.default.poller.add(clientFile.fileDescriptor, mask: EPOLLIN | EPOLLOUT) { mask in + try! ProcessSupervisor.default.registerFd(clientFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromClient { let (fromEof, toEof) = Self.transferData( fromFile: &clientFile, @@ -269,7 +269,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromClient { + } else if mask.isRemoteHangup && !eofFromClient { // half close, shut down client to server transfer // we should see no more EPOLLIN events on the client fd // and no more EPOLLOUT events on the server fd @@ -295,7 +295,7 @@ extension VsockProxy { } } - try! ProcessSupervisor.default.poller.add(serverFile.fileDescriptor, mask: EPOLLIN | EPOLLOUT) { mask in + try! ProcessSupervisor.default.registerFd(serverFile.fileDescriptor, mask: [.input, .output]) { mask in if mask.readyToRead && !eofFromServer { let (fromEof, toEof) = Self.transferData( fromFile: &serverFile, @@ -321,7 +321,7 @@ extension VsockProxy { if mask.isHangup { eofFromClient = true eofFromServer = true - } else if mask.isRhangup && !eofFromServer { + } else if mask.isRemoteHangup && !eofFromServer { // half close, shut down server to client transfer // we should see no more EPOLLIN events on the server fd // and no more EPOLLOUT events on the client fd