Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions compio-driver/src/sys/driver/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use windows_sys::Win32::{Foundation::ERROR_OPERATION_ABORTED, System::IO::OVERLA
use crate::{
AsyncifyPool, DriverType, Entry, ErasedKey, ProactorBuilder,
control::Carrier,
sys::{extra::IocpExtra, prelude::*},
sys::{driver::AwakeFlag, extra::IocpExtra, prelude::*},
};

mod cp;
Expand Down Expand Up @@ -189,9 +189,11 @@ impl Driver {

if !has_entry {
for e in self.notify.port.poll(timeout)? {
self.notify.set_awake(true);
if let Some(e) = Self::create_entry(notify, &mut self.waits, e) {
e.notify()
}
self.notify.set_awake(false);
Comment thread
Berrysoft marked this conversation as resolved.
}
}

Expand All @@ -217,16 +219,20 @@ impl AsRawFd for Driver {
pub(crate) struct Notify {
port: cp::Port,
overlapped: Overlapped,
awake: AwakeFlag,
}

impl Notify {
fn new(port: cp::Port, overlapped: Overlapped) -> Self {
Self { port, overlapped }
Self {
port,
overlapped,
awake: AwakeFlag::new(),
}
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
self.port.post_raw(&self.overlapped)
fn set_awake(&self, awake: bool) {
self.awake.set(awake);
}
}

Expand All @@ -236,6 +242,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
self.port.post_raw(&self.overlapped).ok();
}
}
}
37 changes: 25 additions & 12 deletions compio-driver/src/sys/driver/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,14 @@ impl Driver {
has_entry
}

fn poll_entries(&mut self) -> bool {
let mut has_entry = self.poll_blocking();

fn poll_entries(&mut self, hot_path: bool) -> bool {
let mut cqueue = self.inner.completion();
cqueue.sync();
has_entry |= !cqueue.is_empty();
let has_entry = !cqueue.is_empty();
// TODO: likely hint
if hot_path {
self.notifier.set_awake(true);
}
for entry in cqueue {
match entry.user_data() {
Self::CANCEL => {}
Expand All @@ -204,7 +206,9 @@ impl Driver {
if !more(flags) {
self.need_push_notifier = true;
}
self.notifier.clear().expect("cannot clear notifier");
if let Err(_e) = self.notifier.clear() {
error!("failed to clear notifier: {_e}");
}
}
key => {
let flags = entry.flags();
Expand All @@ -225,6 +229,9 @@ impl Driver {
}
}
}
if hot_path {
self.notifier.set_awake(false);
}
has_entry
}

Expand Down Expand Up @@ -276,7 +283,6 @@ impl Driver {
}
Err(_) => {
drop(squeue);
self.poll_entries();
match self.submit_auto(Some(Duration::ZERO)) {
Ok(()) => {}
Err(e)
Expand All @@ -286,6 +292,12 @@ impl Driver {
) => {}
Err(e) => return Err(e),
}
// If the CQEs are consumed here, we should make the driver aware of it. We
// should not mask `awake` here, otherwise the driver may wait for the next
// event indefinitely.
//
// Anyway it is not a hot path, so we can afford an extra `write` syscall here.
self.poll_entries(false);
}
}
}
Expand Down Expand Up @@ -343,12 +355,15 @@ impl Driver {
closure = e.0;
std::thread::yield_now();
}
self.poll_blocking();
}

pub fn poll(&mut self, timeout: Option<Duration>) -> io::Result<()> {
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
// Anyway we need to submit once, no matter if there are entries in squeue.

if self.poll_blocking() {
return Ok(());
}

trace!("start polling");

if self.need_push_notifier {
Expand All @@ -363,10 +378,8 @@ impl Driver {
self.need_push_notifier = false;
}

if !self.poll_entries() {
self.submit_auto(timeout)?;
self.poll_entries();
}
self.submit_auto(timeout)?;
Comment thread
Berrysoft marked this conversation as resolved.
self.poll_entries(true);

Ok(())
}
Expand Down
22 changes: 15 additions & 7 deletions compio-driver/src/sys/driver/iour/notify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use rustix::event::{EventfdFlags, eventfd};

use super::*;
use crate::sys::driver::AwakeFlag;

#[derive(Debug)]
pub(super) struct Notifier {
Expand Down Expand Up @@ -29,6 +30,10 @@ impl Notifier {
Ok(())
}

pub fn set_awake(&self, awake: bool) {
self.notify.set_awake(awake);
}

pub fn waker(&self) -> Waker {
Waker::from(self.notify.clone())
}
Expand All @@ -50,18 +55,19 @@ impl AsRawFd for Notifier {
#[derive(Debug)]
pub(super) struct Notify {
fd: OwnedFd,
awake: AwakeFlag,
}

impl Notify {
pub fn new(fd: OwnedFd) -> Self {
Self { fd }
Self {
fd,
awake: AwakeFlag::new(),
}
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
rustix::io::write(&self.fd, &u64::to_be_bytes(1))?;

Ok(())
pub fn set_awake(&self, awake: bool) {
self.awake.set(awake);
}
}

Expand All @@ -71,6 +77,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
rustix::io::write(&self.fd, &u64::to_be_bytes(1)).ok();
}
Comment thread
Berrysoft marked this conversation as resolved.
}
}
19 changes: 19 additions & 0 deletions compio-driver/src/sys/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::{AtomicBool, Ordering};

cfg_if::cfg_if! {
if #[cfg(windows)] {
mod iocp;
Expand All @@ -21,3 +23,20 @@ cfg_if::cfg_if! {

crate::assert_not_impl!(Driver, Send);
crate::assert_not_impl!(Driver, Sync);

#[derive(Debug)]
struct AwakeFlag(AtomicBool);

impl AwakeFlag {
pub fn new() -> Self {
Self(AtomicBool::new(false))
}

pub fn set(&self, awake: bool) {
self.0.store(awake, Ordering::SeqCst);
}

pub fn wake(&self) -> bool {
self.0.swap(true, Ordering::SeqCst)
}
}
19 changes: 13 additions & 6 deletions compio-driver/src/sys/driver/poll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
AsyncifyPool, Entry,
key::BorrowedKey,
panic::catch_unwind_io,
sys::{extra::PollExtra, prelude::*},
sys::{driver::AwakeFlag, extra::PollExtra, prelude::*},
};

#[derive(Debug, Default)]
Expand Down Expand Up @@ -172,8 +172,10 @@ impl Driver {
F: FnOnce(&mut Self, &mut Events) -> R,
{
let mut events = std::mem::take(&mut self.events);
self.notify.set_awake(true);
let res = f(self, &mut events);
self.events = events;
self.notify.set_awake(false);
res
}

Expand Down Expand Up @@ -527,16 +529,19 @@ impl Entry {
/// A notify handle to the inner driver.
pub(crate) struct Notify {
poll: Poller,
awake: AwakeFlag,
}

impl Notify {
fn new(poll: Poller) -> Self {
Self { poll }
Self {
poll,
awake: AwakeFlag::new(),
}
}

/// Notify the inner driver.
pub fn notify(&self) -> io::Result<()> {
self.poll.notify()
fn set_awake(&self, awake: bool) {
self.awake.set(awake);
}
}

Expand All @@ -546,6 +551,8 @@ impl Wake for Notify {
}

fn wake_by_ref(self: &Arc<Self>) {
self.notify().ok();
if !self.awake.wake() {
self.poll.notify().ok();
}
Comment thread
Berrysoft marked this conversation as resolved.
}
}
1 change: 0 additions & 1 deletion compio-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] }
nix = { workspace = true, features = ["resource", "signal"] }

[features]
notify-always = []
enable_log = ["compio-log/enable_log"]

[lints.rust]
Expand Down
5 changes: 1 addition & 4 deletions compio-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ pub struct ExecutorConfig {
/// The maximum number of hot tasks to run in each tick.
pub max_interval: u32,

/// A waker to be waken when a task is scheduled from other thread.
/// A waker to be waken when a task is scheduled.
Comment thread
Berrysoft marked this conversation as resolved.
Outdated
///
/// This is useful for waking up drivers that switch to kernel state when
/// idle.
///
/// Enable `notify-always` feature to wake this waker on every schedule,
/// even if the executor is already awake.
pub waker: Option<Waker>,
Comment thread
Berrysoft marked this conversation as resolved.
Outdated
}

Expand Down
4 changes: 1 addition & 3 deletions compio-executor/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ impl<'a> Local<'a> {

queue.make_hot(self.header().id);

if cfg!(feature = "notify-always")
&& let Some(ref waker) = shared.waker
{
if let Some(ref waker) = shared.waker {
waker.wake_by_ref()
}
}
Expand Down
3 changes: 0 additions & 3 deletions compio-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ synchrony = { workspace = true, features = ["async_flag"] }
time = []
async-fd = ["dep:compio-io"]

# Enable it to always notify the driver when a task schedules.
notify-always = ["compio-executor/notify-always"]

current_thread_id = []
nightly = ["current_thread_id"]

Expand Down
16 changes: 3 additions & 13 deletions compio-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use std::{
io,
ops::Deref,
rc::Rc,
sync::Arc,
task::{Context, Poll, Waker},
time::Duration,
};
Expand All @@ -57,7 +56,7 @@ use compio_log::{debug, instrument};
use crate::affinity::bind_to_cpu_set;
#[cfg(feature = "time")]
use crate::time::{TimerFuture, TimerKey, TimerRuntime};
pub use crate::{attacher::*, cancel::CancelToken, future::*, waker::OptWaker};
pub use crate::{attacher::*, cancel::CancelToken, future::*};
Comment thread
Berrysoft marked this conversation as resolved.

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

Expand Down Expand Up @@ -184,19 +183,10 @@ impl Runtime {
self.driver.borrow().waker()
}

/// Low level API to control the runtime.
///
/// Create an optimized waker that only notifies the runtime when woken
/// from another thread, or when `notify-always` is enabled.
pub fn opt_waker(&self) -> Arc<OptWaker> {
OptWaker::new(self.waker())
}

/// Block on the future till it completes.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.enter(|| {
let opt_waker = self.opt_waker();
let waker = Waker::from(opt_waker.clone());
let waker = self.waker();
let mut context = Context::from_waker(&waker);
let mut future = std::pin::pin!(future);
loop {
Expand All @@ -205,7 +195,7 @@ impl Runtime {
return result;
}
// We always want to reset the waker here.
Comment thread
Berrysoft marked this conversation as resolved.
Outdated
let remaining_tasks = self.run() | opt_waker.reset();
let remaining_tasks = self.run();
if remaining_tasks {
self.poll_with(Some(Duration::ZERO));
} else {
Expand Down
2 changes: 0 additions & 2 deletions compio-runtime/src/waker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Waker related utilities.

mod ext;
mod opt;

pub(crate) use ext::*;
pub use opt::*;
Loading
Loading