Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 115 additions & 105 deletions Sources/ContainerizationOS/Linux/Epoll.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,168 +14,178 @@
// limitations under the License.
//===----------------------------------------------------------------------===//

#if os(Linux)
import Foundation

#if canImport(Musl)
import Musl
#elseif canImport(Glibc)
import Glibc
#endif

import Foundation
import Synchronization

/// Register file descriptors to receive events via Linux's
/// epoll syscall surface.
/// A thin wrapper around the Linux epoll syscall surface.
public final class Epoll: Sendable {
public typealias Mask = Int32
public typealias Handler = (@Sendable (Mask) -> Void)
/// A set of epoll event flags.
public struct Mask: OptionSet, Sendable {
public let rawValue: UInt32

public init(rawValue: UInt32) {
self.rawValue = rawValue
}

public static let input = Mask(rawValue: UInt32(bitPattern: EPOLLIN))
public static let output = Mask(rawValue: UInt32(bitPattern: EPOLLOUT))

public var isHangup: Bool {
!self.isDisjoint(with: Mask(rawValue: UInt32(bitPattern: EPOLLHUP | EPOLLERR)))
}

public var isRemoteHangup: Bool {
!self.isDisjoint(with: Mask(rawValue: UInt32(bitPattern: EPOLLRDHUP)))
}

public var readyToRead: Bool {
self.contains(.input)
}

public var readyToWrite: Bool {
self.contains(.output)
}
}

/// An event returned by `wait()`.
public struct Event: Sendable {
public let fd: Int32
public let mask: Mask
}

private let epollFD: Int32
private let handlers = SafeMap<Int32, Handler>()
private let pipe = Pipe() // to wake up a waiting epoll_wait
private let shutdownReadFD: Int32
private let shutdownWriteFD: Int32

public init() throws {
let efd = epoll_create1(EPOLL_CLOEXEC)
guard efd > 0 else {
guard efd >= 0 else {
throw POSIXError.fromErrno()
}

var pipeFDs: (Int32, Int32) = (0, 0)
let pipeResult = withUnsafeMutablePointer(to: &pipeFDs) { ptr in
ptr.withMemoryRebound(to: Int32.self, capacity: 2) { buf in
pipe2(buf, O_CLOEXEC | O_NONBLOCK)
}
}
guard pipeResult == 0 else {
let pipeErrno = POSIXError.fromErrno()
close(efd)
throw pipeErrno
}

self.epollFD = efd
try self.add(pipe.fileHandleForReading.fileDescriptor) { _ in }
self.shutdownReadFD = pipeFDs.0
self.shutdownWriteFD = pipeFDs.1

// Register the shutdown pipe read end with epoll.
var event = epoll_event()
event.events = UInt32(bitPattern: EPOLLIN)
event.data.fd = self.shutdownReadFD
let ctlResult = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(efd, EPOLL_CTL_ADD, self.shutdownReadFD, ptr)
}
guard ctlResult == 0 else {
let ctlErrno = POSIXError.fromErrno()
close(self.shutdownReadFD)
close(self.shutdownWriteFD)
close(efd)
throw ctlErrno
}
}

public func add(
_ fd: Int32,
mask: Int32 = EPOLLIN | EPOLLOUT, // HUP is always added
handler: @escaping Handler
) throws {
deinit {
close(epollFD)
close(shutdownReadFD)
close(shutdownWriteFD)
}

/// Register a file descriptor for edge-triggered monitoring.
public func add(_ fd: Int32, mask: Mask) throws {
guard fcntl(fd, F_SETFL, O_NONBLOCK) == 0 else {
throw POSIXError.fromErrno()
}

let events = EPOLLET | UInt32(bitPattern: mask)
let events = EPOLLET | mask.rawValue

var event = epoll_event()
event.events = events
event.data.fd = fd

try withUnsafeMutablePointer(to: &event) { ptr in
while true {
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
if errno == EAGAIN || errno == EINTR {
continue
}
throw POSIXError.fromErrno()
}
break
if epoll_ctl(self.epollFD, EPOLL_CTL_ADD, fd, ptr) == -1 {
throw POSIXError.fromErrno()
}
}
}

self.handlers.set(fd, handler)
/// Remove a file descriptor from the monitored collection.
public func delete(_ fd: Int32) throws {
var event = epoll_event()
let result = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr)
}
if result != 0 {
if !acceptableDeletionErrno() {
throw POSIXError.fromErrno()
}
}
}

/// Run the main epoll loop.
/// Wait for events.
///
/// max events to return in a single wait
/// timeout in ms.
/// -1 means block forever.
/// 0 means return immediately if no events.
public func run(maxEvents: Int = 128, timeout: Int32 = -1) throws {
/// Returns an array of events ready for processing. Returns an empty array
/// on shutdown or timeout.
public func wait(maxEvents: Int = 128, timeout: Int32 = -1) -> [Event] {
var events: [epoll_event] = .init(
repeating: epoll_event(),
count: maxEvents
)

while true {
let n = epoll_wait(self.epollFD, &events, Int32(events.count), timeout)
guard n >= 0 else {
if n < 0 {
if errno == EINTR || errno == EAGAIN {
continue // go back to epoll_wait
continue
}
throw POSIXError.fromErrno()
return []
}

if n == 0 {
return // if epoll wait times out, then n will be 0
return []
}

var result: [Event] = []
result.reserveCapacity(Int(n))
for i in 0..<Int(n) {
let fd = events[i].data.fd
let mask = events[i].events

if fd == self.pipe.fileHandleForReading.fileDescriptor {
close(self.epollFD)
return // this is a shutdown message
if fd == self.shutdownReadFD {
return []
}

guard let handler = handlers.get(fd) else {
continue
}
handler(Int32(bitPattern: mask))
result.append(Event(fd: fd, mask: Mask(rawValue: events[i].events)))
}
return result
}
}

/// Remove the provided fd from the monitored collection.
public func delete(_ fd: Int32) throws {
var event = epoll_event()
let result = withUnsafeMutablePointer(to: &event) { ptr in
epoll_ctl(self.epollFD, EPOLL_CTL_DEL, fd, ptr)
}
if result != 0 {
if !acceptableDeletionErrno() {
throw POSIXError.fromErrno()
}
}
self.handlers.del(fd)
/// Signal the epoll loop to stop waiting.
public func shutdown() {
var byte: UInt8 = 0
_ = Musl.write(shutdownWriteFD, &byte, 1)
}

// The errno's here are acceptable and can happen if the caller
// closed the underlying fd before calling delete().
private func acceptableDeletionErrno() -> Bool {
errno == ENOENT || errno == EBADF || errno == EPERM
}

/// Shutdown the epoll handler.
public func shutdown() throws {
// wakes up epoll_wait and triggers a shutdown
try self.pipe.fileHandleForWriting.close()
}

private final class SafeMap<Key: Hashable & Sendable, Value: Sendable>: Sendable {
let dict = Mutex<[Key: Value]>([:])

func set(_ key: Key, _ value: Value) {
dict.withLock { @Sendable in
$0[key] = value
}
}

func get(_ key: Key) -> Value? {
dict.withLock { @Sendable in
$0[key]
}
}

func del(_ key: Key) {
dict.withLock { @Sendable in
_ = $0.removeValue(forKey: key)
}
}
}
}

extension Epoll.Mask {
public var isHangup: Bool {
(self & (EPOLLHUP | EPOLLERR)) != 0
}

public var isRhangup: Bool {
(self & EPOLLRDHUP) != 0
}

public var readyToRead: Bool {
(self & EPOLLIN) != 0
}

public var readyToWrite: Bool {
(self & EPOLLOUT) != 0
}
}

#endif // canImport(Musl)
#endif // os(Linux)
4 changes: 2 additions & 2 deletions vminitd/Sources/vminitd/IOPair.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ final class IOPair: Sendable {
// Remove the fd from our global epoll instance first.
let readFromFd = self.from.fileDescriptor
do {
try ProcessSupervisor.default.poller.delete(readFromFd)
try ProcessSupervisor.default.unregisterFd(readFromFd)
} catch {
logger?.error("failed to delete fd from epoll \(readFromFd): \(error)")
}
Expand Down Expand Up @@ -118,7 +118,7 @@ final class IOPair: Sendable {
let readFrom = OSFile(fd: readFromFd)
let writeTo = OSFile(fd: writeToFd)

try ProcessSupervisor.default.poller.add(readFromFd, mask: EPOLLIN) { mask in
try ProcessSupervisor.default.registerFd(readFromFd, mask: .input) { mask in
self.io.withLock { io in
if io.closed {
return
Expand Down
40 changes: 37 additions & 3 deletions vminitd/Sources/vminitd/ProcessSupervisor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import Logging
import Synchronization

final class ProcessSupervisor: Sendable {
let poller: Epoll
private let poller: Epoll
private let handlers = Mutex<[Int32: @Sendable (Epoll.Mask) -> Void]>([:])

private let queue: DispatchQueue
// `DispatchSourceSignal` is thread-safe.
Expand All @@ -47,11 +48,44 @@ final class ProcessSupervisor: Sendable {
self.poller = try! Epoll()
self.state = Mutex(State())
let t = Thread {
try! self.poller.run()
while true {
let events = self.poller.wait()
if events.isEmpty {
break
}
for event in events {
let handler = self.handlers.withLock { $0[event.fd] }
handler?(event.mask)
}
}
}
t.start()
}

/// Register a file descriptor for epoll monitoring with a handler.
///
/// The handler is stored before the fd is added to epoll, ensuring no
/// events are missed.
func registerFd(
_ fd: Int32,
mask: Epoll.Mask = [.input, .output],
handler: @escaping @Sendable (Epoll.Mask) -> Void
) throws {
self.handlers.withLock { $0[fd] = handler }
do {
try self.poller.add(fd, mask: mask)
} catch {
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
throw error
}
}

/// Remove a file descriptor from epoll monitoring and discard its handler.
func unregisterFd(_ fd: Int32) throws {
self.handlers.withLock { _ = $0.removeValue(forKey: fd) }
try self.poller.delete(fd)
}

func ready() {
self.source.setEventHandler {
self.handleSignal()
Expand Down Expand Up @@ -123,6 +157,6 @@ final class ProcessSupervisor: Sendable {

deinit {
source.cancel()
try? poller.shutdown()
poller.shutdown()
}
}
Loading
Loading