Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base

private:
void
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
long timeout_us) override;
void interrupt_reactor() const override;
void update_timerfd() const;

Expand Down Expand Up @@ -294,9 +295,16 @@ epoll_scheduler::update_timerfd() const
}

inline void
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
epoll_scheduler::run_task(
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
{
int timeout_ms = task_interrupted_ ? 0 : -1;
int timeout_ms;
if (task_interrupted_)
timeout_ms = 0;
else if (timeout_us < 0)
timeout_ms = -1;
else
timeout_ms = static_cast<int>((timeout_us + 999) / 1000);

if (lock.owns_lock())
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ class BOOST_COROSIO_DECL kqueue_scheduler final : public reactor_scheduler_base

private:
void
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
long timeout_us) override;
void interrupt_reactor() const override;
long calculate_timeout(long requested_timeout_us) const;

Expand Down Expand Up @@ -285,9 +286,10 @@ kqueue_scheduler::calculate_timeout(long requested_timeout_us) const

inline void
kqueue_scheduler::run_task(
std::unique_lock<std::mutex>& lock, context_type* ctx)
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
{
long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
long effective_timeout_us =
task_interrupted_ ? 0 : calculate_timeout(timeout_us);

if (lock.owns_lock())
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ class reactor_scheduler_base

/// Run the platform-specific reactor poll.
virtual void
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
long timeout_us) = 0;

/// Wake a blocked reactor (e.g. write to eventfd or pipe).
virtual void interrupt_reactor() const = 0;
Expand Down Expand Up @@ -775,15 +776,16 @@ reactor_scheduler_base::do_one(
return 0;
}

task_interrupted_ = more_handlers || timeout_us == 0;
long task_timeout_us = more_handlers ? 0 : timeout_us;
task_interrupted_ = task_timeout_us == 0;
task_running_.store(true, std::memory_order_release);

if (more_handlers)
unlock_and_signal_one(lock);

try
{
run_task(lock, ctx);
run_task(lock, ctx, task_timeout_us);
}
catch (...)
{
Expand All @@ -793,6 +795,8 @@ reactor_scheduler_base::do_one(

task_running_.store(false, std::memory_order_relaxed);
completed_ops_.push(&task_op_);
if (timeout_us > 0)
return 0;
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base

private:
void
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
run_task(std::unique_lock<std::mutex>& lock, context_type* ctx,
long timeout_us) override;
void interrupt_reactor() const override;
long calculate_timeout(long requested_timeout_us) const;

Expand Down Expand Up @@ -302,9 +303,10 @@ select_scheduler::calculate_timeout(long requested_timeout_us) const

inline void
select_scheduler::run_task(
std::unique_lock<std::mutex>& lock, context_type* ctx)
std::unique_lock<std::mutex>& lock, context_type* ctx, long timeout_us)
{
long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
long effective_timeout_us =
task_interrupted_ ? 0 : calculate_timeout(timeout_us);

// Snapshot registered descriptors while holding lock.
// Record which fds need write monitoring to avoid a hot loop:
Expand Down
46 changes: 46 additions & 0 deletions test/unit/io_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,50 @@ struct io_context_test
BOOST_TEST(counter == 1);
}

void testRunForWithOutstandingWork()
{
io_context ioc;
auto ex = ioc.get_executor();

// Simulate persistent outstanding work (like a listening acceptor)
ex.on_work_started();

auto start = std::chrono::steady_clock::now();
std::size_t n = ioc.run_for(std::chrono::milliseconds(200));
auto elapsed = std::chrono::steady_clock::now() - start;

auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
.count();

// Must return after ~200ms, not block forever
BOOST_TEST(n == 0);
BOOST_TEST(ms >= 150);
BOOST_TEST(ms < 1000);

ex.on_work_finished();
}

void testRunOneForWithOutstandingWork()
{
io_context ioc;
auto ex = ioc.get_executor();

ex.on_work_started();

auto start = std::chrono::steady_clock::now();
std::size_t n = ioc.run_one_for(std::chrono::milliseconds(200));
auto elapsed = std::chrono::steady_clock::now() - start;

auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
.count();

BOOST_TEST(n == 0);
BOOST_TEST(ms >= 150);
BOOST_TEST(ms < 1000);

ex.on_work_finished();
}

void testExecutorRunningInThisThread()
{
io_context ioc;
Expand Down Expand Up @@ -610,6 +654,8 @@ struct io_context_test
testRunOneFor();
testRunOneUntil();
testRunFor();
testRunForWithOutstandingWork();
testRunOneForWithOutstandingWork();
testExecutorRunningInThisThread();
testMultithreaded();
testMultithreadedStress();
Expand Down
Loading