Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions compio-driver/src/sys/driver/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use windows_sys::Win32::{Foundation::ERROR_OPERATION_ABORTED, System::IO::OVERLA
use crate::{
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder,
control::Carrier,
sys::{extra::IocpExtra, prelude::*},
sys::{driver::AwakeFlag, extra::IocpExtra, prelude::*},
};

mod cp;
Expand Down Expand Up @@ -186,10 +186,14 @@ impl Driver {
entry.notify();
has_entry = true;
}
if self.notify.reset() {
has_entry = true;
}

if !has_entry {
for e in self.notify.port.poll(timeout)? {
if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
self.notify.set_awake();
e.notify()
}
}
Expand Down Expand Up @@ -217,16 +221,24 @@ impl AsRawFd for Driver {
pub(crate) struct Notify {
port: cp::Port,
overlapped: Overlapped,
awake: AwakeFlag,
}

impl Notify {
fn new(port: cp::Port, overlapped: Overlapped) -> Self {
Self { port, overlapped }
Self {
port,
overlapped,
awake: AwakeFlag::new(),
}
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
self.port.post_raw(&self.overlapped)
fn set_awake(&self) {
self.awake.set();
}

fn reset(&self) -> bool {
self.awake.reset()
}
}

Expand All @@ -236,6 +248,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
self.port.post_raw(&self.overlapped).ok();
}
}
}
43 changes: 26 additions & 17 deletions compio-driver/src/sys/driver/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ impl Driver {
}

// Auto means that it choose to wait or not automatically.
fn submit_auto(&mut self, timeout: Option<Duration>) -> io::Result<()> {
fn submit_auto(&mut self, timeout: Option<Duration>, need_wait: bool) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout);

// when taskrun is true, there are completed cqes wait to handle, no need to
// block the submit
let want_sqe = if self.inner.submission().taskrun() {
let want_sqe = if !need_wait || self.inner.submission().taskrun() {
0
} else {
1
Expand All @@ -173,7 +173,7 @@ impl Driver {
trace!("submit result: {res:?}");
match res {
Ok(_) => {
if self.inner.completion().is_empty() {
if want_sqe > 0 && self.inner.completion().is_empty() {
Err(io::ErrorKind::TimedOut.into())
} else {
Ok(())
Expand All @@ -197,11 +197,8 @@ impl Driver {
}

fn poll_entries(&mut self) -> bool {
let mut has_entry = self.poll_blocking();

let mut cqueue = self.inner.completion();
cqueue.sync();
has_entry |= !cqueue.is_empty();
let cqueue = self.inner.completion();
let has_entry = !cqueue.is_empty();
for entry in cqueue {
match entry.user_data() {
Self::CANCEL => {}
Expand All @@ -210,7 +207,9 @@ impl Driver {
if !more(flags) {
self.need_push_notifier = true;
}
self.notifier.clear().expect("cannot clear notifier");
if let Err(_e) = self.notifier.clear() {
error!("failed to clear notifier: {_e}");
}
}
key => {
let flags = entry.flags();
Expand Down Expand Up @@ -282,8 +281,7 @@ impl Driver {
}
Err(_) => {
drop(squeue);
self.poll_entries();
match self.submit_auto(Some(Duration::ZERO)) {
match self.submit_auto(Some(Duration::ZERO), true) {
Ok(()) => {}
Err(e)
if matches!(
Expand All @@ -292,6 +290,12 @@ impl Driver {
) => {}
Err(e) => return Err(e),
}
// If the CQEs are consumed here, we should make the driver aware of it. We
// should not mask `awake` here, otherwise the driver may wait for the next
// event indefinitely.
//
// Anyway it is not a hot path, so we can afford an extra `write` syscall here.
self.poll_entries();
}
}
}
Expand Down Expand Up @@ -349,14 +353,19 @@ impl Driver {
closure = e.0;
std::thread::yield_now();
}
self.poll_blocking();
}

pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Anyway we need to submit once, no matter if there are entries in squeue.

if self.poll_blocking() {
return Ok(());
}

trace!("start polling");

let need_wait = !self.notifier.reset();

if self.need_push_notifier {
#[allow(clippy::useless_conversion)]
self.push_raw(
Expand All @@ -369,10 +378,10 @@ impl Driver {
self.need_push_notifier = false;
}

if !self.poll_entries() {
self.submit_auto(timeout)?;
self.poll_entries();
}
self.submit_auto(timeout, need_wait)?;

self.notifier.set_awake();
self.poll_entries();

Ok(())
}
Expand Down
28 changes: 22 additions & 6 deletions compio-driver/src/sys/driver/iour/notify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use rustix::event::{EventfdFlags, eventfd};

use super::*;
use crate::sys::driver::AwakeFlag;

#[derive(Debug)]
pub(super) struct Notifier {
Expand Down Expand Up @@ -29,6 +30,14 @@ impl Notifier {
Ok(())
}

pub fn set_awake(&self) {
self.notify.set_awake();
}

pub fn reset(&self) -> bool {
self.notify.reset()
}

pub fn waker(&self) -> Waker {
Waker::from(self.notify.clone())
}
Expand All @@ -50,18 +59,23 @@ impl AsRawFd for Notifier {
#[derive(Debug)]
pub(super) struct Notify {
fd: OwnedFd,
awake: AwakeFlag,
}

impl Notify {
pub fn new(fd: OwnedFd) -> Self {
Self { fd }
Self {
fd,
awake: AwakeFlag::new(),
}
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
rustix::io::write(&self.fd, &u64::to_be_bytes(1))?;
pub fn set_awake(&self) {
self.awake.set();
}

Ok(())
pub fn reset(&self) -> bool {
self.awake.reset()
}
}

Expand All @@ -71,6 +85,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
rustix::io::write(&self.fd, &u64::to_be_bytes(1)).ok();
}
Comment thread
Berrysoft marked this conversation as resolved.
}
}
33 changes: 33 additions & 0 deletions compio-driver/src/sys/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicU8, Ordering};

cfg_if::cfg_if! {
if #[cfg(windows)] {
mod iocp;
Expand All @@ -21,3 +23,34 @@ cfg_if::cfg_if! {

crate::assert_not_impl!(Driver, Send);
crate::assert_not_impl!(Driver, Sync);

const IDLE: u8 = 0b00;
const NOTIFIED: u8 = 0b01;
const AWAKE: u8 = 0b10;

#[derive(Debug)]
struct AwakeFlag(AtomicU8);

impl AwakeFlag {
pub fn new() -> Self {
Self(AtomicU8::new(IDLE))
}

/// Set the awake flag. It is true before the driver sleeps, and false after
/// it wakes up.
pub fn set(&self) {
self.0.fetch_or(AWAKE, Ordering::SeqCst);
}

/// Reset the flags. Returns true if it was notified.
pub fn reset(&self) -> bool {
(self.0.swap(IDLE, Ordering::SeqCst) & NOTIFIED) != 0
}

/// Set the notified flag. Returns true if the awake flag is set or the
/// notified flag is set. If the awake flag is not set, the driver needs
/// to be notified through a syscall.
pub fn wake(&self) -> bool {
self.0.fetch_or(NOTIFIED, Ordering::SeqCst) != 0
}
}
44 changes: 33 additions & 11 deletions compio-driver/src/sys/driver/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
AsyncifyPool, Entry,
key::BorrowedKey,
panic::catch_unwind_io,
sys::{extra::PollExtra, prelude::*},
sys::{driver::AwakeFlag, extra::PollExtra, prelude::*},
};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -436,15 +436,28 @@ impl Driver {
self.renew(fd, renew_event)
}

pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
pub fn poll(&mut self, mut timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
if self.poll_completed() {
return Ok(());
let timeout_is_some = timeout.is_some();
let has_completed = !self.completed_rx.is_empty();
let need_wait = !self.notify.reset();
if !need_wait || has_completed {
timeout = Some(Duration::ZERO);
}
// We need to poll the poller first to make sure it handles the internal notify
// event (if any).
self.events.clear();
self.notify.poll.wait(&mut self.events, timeout)?;
if self.events.is_empty() && timeout.is_some() {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
self.notify.set_awake();
if self.events.is_empty() {
if self.poll_completed() {
return Ok(());
}
if timeout_is_some {
return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT));
}
} else if has_completed {
self.poll_completed();
}
self.with_events(|this, events| {
for event in events.iter() {
Expand Down Expand Up @@ -527,16 +540,23 @@ impl Entry {
/// A notify handle to the inner driver.
pub(crate) struct Notify {
poll: Poller,
awake: AwakeFlag,
}

impl Notify {
fn new(poll: Poller) -> Self {
Self { poll }
Self {
poll,
awake: AwakeFlag::new(),
}
}

fn set_awake(&self) {
self.awake.set();
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
self.poll.notify()
fn reset(&self) -> bool {
self.awake.reset()
}
}

Expand All @@ -546,6 +566,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
self.poll.notify().ok();
}
Comment thread
Berrysoft marked this conversation as resolved.
}
}
1 change: 0 additions & 1 deletion compio-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
nix = { workspace = true, features = ["resource", "signal"] }

[features]
notify-always = []
enable_log = ["compio-log/enable_log"]

[lints.rust]
Expand Down
5 changes: 1 addition & 4 deletions compio-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ pub struct ExecutorConfig {
/// The maximum number of hot tasks to run in each tick.
pub max_interval: u32,

/// A waker to be waken when a task is scheduled from other thread.
/// A waker to be woken when a task is scheduled.
///
/// This is useful for waking up drivers that switch to kernel state when
/// idle.
///
/// Enable `notify-always` feature to wake this waker on every schedule,
/// even if the executor is already awake.
pub waker: Option<Waker>,
}

Expand Down
4 changes: 1 addition & 3 deletions compio-executor/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl<'a> Local<'a> {

queue.make_hot(self.header().id);

if cfg!(feature = "notify-always")
&& let Some(ref waker) = shared.waker
{
if let Some(ref waker) = shared.waker {
waker.wake_by_ref()
}
}
Expand Down
3 changes: 0 additions & 3 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ synchrony = { workspace = true, features = ["async_flag"] }
time = []
async-fd = ["dep:compio-io"]

# Enable it to always notify the driver when a task schedules.
notify-always = ["compio-executor/notify-always"]

current_thread_id = []
nightly = ["current_thread_id"]

Expand Down
Loading
Loading