1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20 -
#include <boost/corosio/native/native_scheduler.hpp>
20 +
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 -
#include <boost/corosio/detail/scheduler_op.hpp>
 
22  

21  

23  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

27  

29 -
#include <boost/corosio/detail/thread_local_ptr.hpp>
 
30  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
31  

29  

32  
#include <atomic>
30  
#include <atomic>
33 -
#include <condition_variable>
 
34 -
#include <cstddef>
 
35  
#include <chrono>
31  
#include <chrono>
36 -
#include <limits>
 
37  
#include <cstdint>
32  
#include <cstdint>
38 -
#include <utility>
 
39  
#include <mutex>
33  
#include <mutex>
40  

34  

41 -
#include <fcntl.h>
 
42  
#include <errno.h>
35  
#include <errno.h>
43  
#include <sys/epoll.h>
36  
#include <sys/epoll.h>
44 -
#include <sys/socket.h>
 
45  
#include <sys/eventfd.h>
37  
#include <sys/eventfd.h>
46  
#include <sys/timerfd.h>
38  
#include <sys/timerfd.h>
47  
#include <unistd.h>
39  
#include <unistd.h>
48  

40  

49  
namespace boost::corosio::detail {
41  
namespace boost::corosio::detail {
50  

42  

51  
struct epoll_op;
43  
struct epoll_op;
52 -
namespace epoll {
 
53 -
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context;
 
54 -
} // namespace epoll
 
55  
struct descriptor_state;
44  
struct descriptor_state;
56  

45  

57  
/** Linux scheduler using epoll for I/O multiplexing.
46  
/** Linux scheduler using epoll for I/O multiplexing.
58  

47  

59  
    This scheduler implements the scheduler interface using Linux epoll
48  
    This scheduler implements the scheduler interface using Linux epoll
60  
    for efficient I/O event notification. It uses a single reactor model
49  
    for efficient I/O event notification. It uses a single reactor model
61  
    where one thread runs epoll_wait while other threads
50  
    where one thread runs epoll_wait while other threads
62  
    wait on a condition variable for handler work. This design provides:
51  
    wait on a condition variable for handler work. This design provides:
63  

52  

64  
    - Handler parallelism: N posted handlers can execute on N threads
53  
    - Handler parallelism: N posted handlers can execute on N threads
65  
    - No thundering herd: condition_variable wakes exactly one thread
54  
    - No thundering herd: condition_variable wakes exactly one thread
66  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
55  
    - IOCP parity: Behavior matches Windows I/O completion port semantics
67  

56  

68  
    When threads call run(), they first try to execute queued handlers.
57  
    When threads call run(), they first try to execute queued handlers.
69  
    If the queue is empty and no reactor is running, one thread becomes
58  
    If the queue is empty and no reactor is running, one thread becomes
70  
    the reactor and runs epoll_wait. Other threads wait on a condition
59  
    the reactor and runs epoll_wait. Other threads wait on a condition
71  
    variable until handlers are available.
60  
    variable until handlers are available.
72  

61  

73  
    @par Thread Safety
62  
    @par Thread Safety
74  
    All public member functions are thread-safe.
63  
    All public member functions are thread-safe.
75  
*/
64  
*/
76 -
class BOOST_COROSIO_DECL epoll_scheduler final
65 +
class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
77 -
    : public native_scheduler
 
78 -
    , public capy::execution_context::service
 
79  
{
66  
{
80 -
    using key_type = scheduler;
 
81 -

 
82  
public:
67  
public:
83  
    /** Construct the scheduler.
68  
    /** Construct the scheduler.
84  

69  

85  
        Creates an epoll instance, eventfd for reactor interruption,
70  
        Creates an epoll instance, eventfd for reactor interruption,
86  
        and timerfd for kernel-managed timer expiry.
71  
        and timerfd for kernel-managed timer expiry.
87  

72  

88  
        @param ctx Reference to the owning execution_context.
73  
        @param ctx Reference to the owning execution_context.
89  
        @param concurrency_hint Hint for expected thread count (unused).
74  
        @param concurrency_hint Hint for expected thread count (unused).
90  
    */
75  
    */
91  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
76  
    epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
92  

77  

93  
    /// Destroy the scheduler.
78  
    /// Destroy the scheduler.
94  
    ~epoll_scheduler() override;
79  
    ~epoll_scheduler() override;
95  

80  

96  
    epoll_scheduler(epoll_scheduler const&)            = delete;
81  
    epoll_scheduler(epoll_scheduler const&)            = delete;
97  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
82  
    epoll_scheduler& operator=(epoll_scheduler const&) = delete;
98  

83  

 
84 +
    /// Shut down the scheduler, draining pending operations.
99 -
    void post(std::coroutine_handle<> h) const override;
 
100 -
    void post(scheduler_op* h) const override;
 
101 -
    bool running_in_this_thread() const noexcept override;
 
102 -
    void stop() override;
 
103 -
    bool stopped() const noexcept override;
 
104 -
    void restart() override;
 
105 -
    std::size_t run() override;
 
106 -
    std::size_t run_one() override;
 
107 -
    std::size_t wait_one(long usec) override;
 
108 -
    std::size_t poll() override;
 
109 -
    std::size_t poll_one() override;
 
110  
    void shutdown() override;
85  
    void shutdown() override;
111  

86  

112  
    /** Return the epoll file descriptor.
87  
    /** Return the epoll file descriptor.
113  

88  

114  
        Used by socket services to register file descriptors
89  
        Used by socket services to register file descriptors
115  
        for I/O event notification.
90  
        for I/O event notification.
116  

91  

117  
        @return The epoll file descriptor.
92  
        @return The epoll file descriptor.
118  
    */
93  
    */
119  
    int epoll_fd() const noexcept
94  
    int epoll_fd() const noexcept
120  
    {
95  
    {
121  
        return epoll_fd_;
96  
        return epoll_fd_;
122  
    }
97  
    }
123 -
    /** Reset the thread's inline completion budget.
 
124 -

 
125 -
        Called at the start of each posted completion handler to
 
126 -
        grant a fresh budget for speculative inline completions.
 
127 -
    */
 
128 -
    void reset_inline_budget() const noexcept;
 
129 -

 
130 -
    /** Consume one unit of inline budget if available.
 
131 -

 
132 -
        @return True if budget was available and consumed.
 
133 -
    */
 
134 -
    bool try_consume_inline_budget() const noexcept;
 
135 -

 
136  

98  

137  
    /** Register a descriptor for persistent monitoring.
99  
    /** Register a descriptor for persistent monitoring.
138  

100  

139  
        The fd is registered once and stays registered until explicitly
101  
        The fd is registered once and stays registered until explicitly
140  
        deregistered. Events are dispatched via descriptor_state which
102  
        deregistered. Events are dispatched via descriptor_state which
141  
        tracks pending read/write/connect operations.
103  
        tracks pending read/write/connect operations.
142  

104  

143  
        @param fd The file descriptor to register.
105  
        @param fd The file descriptor to register.
144  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
106  
        @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
145  
    */
107  
    */
146  
    void register_descriptor(int fd, descriptor_state* desc) const;
108  
    void register_descriptor(int fd, descriptor_state* desc) const;
147  

109  

148  
    /** Deregister a persistently registered descriptor.
110  
    /** Deregister a persistently registered descriptor.
149  

111  

150  
        @param fd The file descriptor to deregister.
112  
        @param fd The file descriptor to deregister.
151  
    */
113  
    */
152  
    void deregister_descriptor(int fd) const;
114  
    void deregister_descriptor(int fd) const;
153 -
    void work_started() noexcept override;
 
154 -
    void work_finished() noexcept override;
 
155 -

 
156 -
    /** Offset a forthcoming work_finished from work_cleanup.
 
157 -

 
158 -
        Called by descriptor_state when all I/O returned EAGAIN and no
 
159 -
        handler will be executed. Must be called from a scheduler thread.
 
160 -
    */
 
161 -
    void compensating_work_started() const noexcept;
 
162 -

 
163 -
    /** Drain work from thread context's private queue to global queue.
 
164 -

 
165 -
        Called by thread_context_guard destructor when a thread exits run().
 
166 -
        Transfers pending work to the global queue under mutex protection.
 
167 -

 
168 -
        @param queue The private queue to drain.
 
169 -
        @param count Item count for wakeup decisions (wakes other threads if positive).
 
170 -
    */
 
171 -
    void drain_thread_queue(op_queue& queue, long count) const;
 
172 -

 
173 -
    /** Post completed operations for deferred invocation.
 
174 -

 
175 -
        If called from a thread running this scheduler, operations go to
 
176 -
        the thread's private queue (fast path). Otherwise, operations are
 
177 -
        added to the global queue under mutex and a waiter is signaled.
 
178 -

 
179 -
        @par Preconditions
 
180 -
        work_started() must have been called for each operation.
 
181 -

 
182 -
        @param ops Queue of operations to post.
 
183 -
    */
 
184 -
    void post_deferred_completions(op_queue& ops) const;
 
185 -

 
186  

115  

187 -
    struct work_cleanup
 
188 -
    {
 
189 -
        epoll_scheduler* scheduler;
 
190 -
        std::unique_lock<std::mutex>* lock;
 
191 -
        epoll::scheduler_context* ctx;
 
192 -
        ~work_cleanup();
 
193 -
    };
 
194 -

 
195 -
    struct task_cleanup
 
196 -
    {
 
197 -
        epoll_scheduler const* scheduler;
 
198 -
        std::unique_lock<std::mutex>* lock;
 
199 -
        epoll::scheduler_context* ctx;
 
200 -
        ~task_cleanup();
 
201 -
    };
 
202 -

 
203 -
    std::size_t do_one(
 
204 -
        std::unique_lock<std::mutex>& lock,
 
205 -
        long timeout_us,
 
206 -
        epoll::scheduler_context* ctx);
 
207  
private:
116  
private:
208  
    void
117  
    void
209 -
    run_task(std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx);
118 +
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
210 -
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
119 +
    void interrupt_reactor() const override;
211 -
    void interrupt_reactor() const;
 
212  
    void update_timerfd() const;
120  
    void update_timerfd() const;
213 -
    /** Set the signaled state and wake all waiting threads.
 
214 -

 
215 -
        @par Preconditions
 
216 -
        Mutex must be held.
 
217 -

 
218 -
        @param lock The held mutex lock.
 
219 -
    */
 
220 -
    void signal_all(std::unique_lock<std::mutex>& lock) const;
 
221 -

 
222 -
    /** Set the signaled state and wake one waiter if any exist.
 
223 -

 
224 -
        Only unlocks and signals if at least one thread is waiting.
 
225 -
        Use this when the caller needs to perform a fallback action
 
226 -
        (such as interrupting the reactor) when no waiters exist.
 
227 -

 
228 -
        @par Preconditions
 
229 -
        Mutex must be held.
 
230 -

 
231 -
        @param lock The held mutex lock.
 
232 -

 
233 -
        @return `true` if unlocked and signaled, `false` if lock still held.
 
234 -
    */
 
235 -
    bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
 
236 -

 
237 -
    /** Set the signaled state, unlock, and wake one waiter if any exist.
 
238 -

 
239 -
        Always unlocks the mutex. Use this when the caller will release
 
240 -
        the lock regardless of whether a waiter exists.
 
241 -

 
242 -
        @par Preconditions
 
243 -
        Mutex must be held.
 
244 -

 
245 -
        @param lock The held mutex lock.
 
246 -

 
247 -
        @return `true` if a waiter was signaled, `false` otherwise.
 
248 -
    */
 
249 -
    bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
 
250 -

 
251 -
    /** Clear the signaled state before waiting.
 
252 -

 
253 -
        @par Preconditions
 
254 -
        Mutex must be held.
 
255 -
    */
 
256 -
    void clear_signal() const;
 
257 -

 
258 -
    /** Block until the signaled state is set.
 
259 -

 
260 -
        Returns immediately if already signaled (fast-path). Otherwise
 
261 -
        increments the waiter count, waits on the condition variable,
 
262 -
        and decrements the waiter count upon waking.
 
263 -

 
264 -
        @par Preconditions
 
265 -
        Mutex must be held.
 
266 -

 
267 -
        @param lock The held mutex lock.
 
268 -
    */
 
269 -
    void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
 
270 -

 
271 -
    /** Block until signaled or timeout expires.
 
272 -

 
273 -
        @par Preconditions
 
274 -
        Mutex must be held.
 
275 -

 
276 -
        @param lock The held mutex lock.
 
277 -
        @param timeout_us Maximum time to wait in microseconds.
 
278 -
    */
 
279 -
    void wait_for_signal_for(
 
280 -
        std::unique_lock<std::mutex>& lock, long timeout_us) const;
 
281 -

 
282  

121  

283  
    int epoll_fd_;
122  
    int epoll_fd_;
284 -
    int event_fd_; // for interrupting reactor
123 +
    int event_fd_;
285 -
    int timer_fd_; // timerfd for kernel-managed timer expiry
124 +
    int timer_fd_;
286 -
    mutable std::mutex mutex_;
 
287 -
    mutable std::condition_variable cond_;
 
288 -
    mutable op_queue completed_ops_;
 
289 -
    mutable std::atomic<long> outstanding_work_;
 
290 -
    bool stopped_;
 
291 -

 
292 -
    // True while a thread is blocked in epoll_wait. Used by
 
293 -
    // wake_one_thread_and_unlock and work_finished to know when
 
294 -
    // an eventfd interrupt is needed instead of a condvar signal.
 
295 -
    mutable std::atomic<bool> task_running_{false};
 
296 -

 
297 -
    // True when the reactor has been told to do a non-blocking poll
 
298 -
    // (more handlers queued or poll mode). Prevents redundant eventfd
 
299 -
    // writes and controls the epoll_wait timeout.
 
300 -
    mutable bool task_interrupted_ = false;
 
301 -

 
302 -
    // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
 
303 -
    mutable std::size_t state_ = 0;
 
304  

125  

305  
    // Edge-triggered eventfd state
126  
    // Edge-triggered eventfd state
306  
    mutable std::atomic<bool> eventfd_armed_{false};
127  
    mutable std::atomic<bool> eventfd_armed_{false};
307  

128  

308 -
    // blocks. Avoids timerfd_settime syscalls for timers that are
 
309 -
    // scheduled then cancelled without being waited on.
 
310  
    // Set when the earliest timer changes; flushed before epoll_wait
129  
    // Set when the earliest timer changes; flushed before epoll_wait
311 -

 
312 -
    // Sentinel operation for interleaving reactor runs with handler execution.
 
313 -
    // Ensures the reactor runs periodically even when handlers are continuously
 
314 -
    // posted, preventing starvation of I/O events, timers, and signals.
 
315 -
    struct task_op final : scheduler_op
 
316 -
    {
 
317 -
        void operator()() override {}
 
318 -
        void destroy() override {}
 
319 -
    };
 
320 -
    task_op task_op_;
 
321  
    mutable std::atomic<bool> timerfd_stale_{false};
130  
    mutable std::atomic<bool> timerfd_stale_{false};
322  
};
131  
};
323 -
//--------------------------------------------------------------------------
 
324 -
//
 
325 -
// Implementation
 
326 -
//
 
327 -
//--------------------------------------------------------------------------
 
328 -

 
329 -
/*
 
330 -
    epoll Scheduler - Single Reactor Model
 
331 -
    ======================================
 
332 -

 
333 -
    This scheduler uses a thread coordination strategy to provide handler
 
334 -
    parallelism and avoid the thundering herd problem.
 
335 -
    Instead of all threads blocking on epoll_wait(), one thread becomes the
 
336 -
    "reactor" while others wait on a condition variable for handler work.
 
337 -

 
338 -
    Thread Model
 
339 -
    ------------
 
340 -
    - ONE thread runs epoll_wait() at a time (the reactor thread)
 
341 -
    - OTHER threads wait on cond_ (condition variable) for handlers
 
342 -
    - When work is posted, exactly one waiting thread wakes via notify_one()
 
343 -
    - This matches Windows IOCP semantics where N posted items wake N threads
 
344 -

 
345 -
    Event Loop Structure (do_one)
 
346 -
    -----------------------------
 
347 -
    1. Lock mutex, try to pop handler from queue
 
348 -
    2. If got handler: execute it (unlocked), return
 
349 -
    3. If queue empty and no reactor running: become reactor
 
350 -
       - Run epoll_wait (unlocked), queue I/O completions, loop back
 
351 -
    4. If queue empty and reactor running: wait on condvar for work
 
352 -

 
353 -
    The task_running_ flag ensures only one thread owns epoll_wait().
 
354 -
    After the reactor queues I/O completions, it loops back to try getting
 
355 -
    a handler, giving priority to handler execution over more I/O polling.
 
356 -

 
357 -
    Signaling State (state_)
 
358 -
    ------------------------
 
359 -
    The state_ variable encodes two pieces of information:
 
360 -
    - Bit 0: signaled flag (1 = signaled, persists until cleared)
 
361 -
    - Upper bits: waiter count (each waiter adds 2 before blocking)
 
362 -

 
363 -
    This allows efficient coordination:
 
364 -
    - Signalers only call notify when waiters exist (state_ > 1)
 
365 -
    - Waiters check if already signaled before blocking (fast-path)
 
366 -

 
367 -
    Wake Coordination (wake_one_thread_and_unlock)
 
368 -
    ----------------------------------------------
 
369 -
    When posting work:
 
370 -
    - If waiters exist (state_ > 1): signal and notify_one()
 
371 -
    - Else if reactor running: interrupt via eventfd write
 
372 -
    - Else: no-op (thread will find work when it checks queue)
 
373 -

 
374 -
    This avoids waking threads unnecessarily. With cascading wakes,
 
375 -
    each handler execution wakes at most one additional thread if
 
376 -
    more work exists in the queue.
 
377 -

 
378 -
    Work Counting
 
379 -
    -------------
 
380 -
    outstanding_work_ tracks pending operations. When it hits zero, run()
 
381 -
    returns. Each operation increments on start, decrements on completion.
 
382 -

 
383 -
    Timer Integration
 
384 -
    -----------------
 
385 -
    Timers are handled by timer_service. The reactor adjusts epoll_wait
 
386 -
    timeout to wake for the nearest timer expiry. When a new timer is
 
387 -
    scheduled earlier than current, timer_service calls interrupt_reactor()
 
388 -
    to re-evaluate the timeout.
 
389 -
*/
 
390 -

 
391 -
namespace epoll {
 
392 -

 
393 -
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
 
394 -
{
 
395 -
    epoll_scheduler const* key;
 
396 -
    scheduler_context* next;
 
397 -
    op_queue private_queue;
 
398 -
    long private_outstanding_work;
 
399 -
    int inline_budget;
 
400 -
    int inline_budget_max;
 
401 -
    bool unassisted;
 
402 -

 
403 -
    scheduler_context(epoll_scheduler const* k, scheduler_context* n)
 
404 -
        : key(k)
 
405 -
        , next(n)
 
406 -
        , private_outstanding_work(0)
 
407 -
        , inline_budget(0)
 
408 -
        , inline_budget_max(2)
 
409 -
        , unassisted(false)
 
410 -
    {
 
411 -
    }
 
412 -
};
 
413 -

 
414 -
inline thread_local_ptr<scheduler_context> context_stack;
 
415 -

 
416 -
struct thread_context_guard
 
417 -
{
 
418 -
    scheduler_context frame_;
 
419 -

 
420 -
    explicit thread_context_guard(epoll_scheduler const* ctx) noexcept
 
421 -
        : frame_(ctx, context_stack.get())
 
422 -
    {
 
423 -
        context_stack.set(&frame_);
 
424 -
    }
 
425 -

 
426 -
    ~thread_context_guard() noexcept
 
427 -
    {
 
428 -
        if (!frame_.private_queue.empty())
 
429 -
            frame_.key->drain_thread_queue(
 
430 -
                frame_.private_queue, frame_.private_outstanding_work);
 
431 -
        context_stack.set(frame_.next);
 
432 -
    }
 
433 -
};
 
434 -

 
435 -
inline scheduler_context*
 
436 -
find_context(epoll_scheduler const* self) noexcept
 
437 -
{
 
438 -
    for (auto* c = context_stack.get(); c != nullptr; c = c->next)
 
439 -
        if (c->key == self)
 
440 -
            return c;
 
441 -
    return nullptr;
 
442 -
}
 
443 -

 
444 -
} // namespace epoll
 
445 -

 
446 -
inline void
 
447 -
epoll_scheduler::reset_inline_budget() const noexcept
 
448 -
{
 
449 -
    if (auto* ctx = epoll::find_context(this))
 
450 -
    {
 
451 -
        // Cap when no other thread absorbed queued work. A moderate
 
452 -
        // cap (4) amortizes scheduling for small buffers while avoiding
 
453 -
        // bursty I/O that fills socket buffers and stalls large transfers.
 
454 -
        if (ctx->unassisted)
 
455 -
        {
 
456 -
            ctx->inline_budget_max = 4;
 
457 -
            ctx->inline_budget     = 4;
 
458 -
            return;
 
459 -
        }
 
460 -
        // Ramp up when previous cycle fully consumed budget.
 
461 -
        // Reset on partial consumption (EAGAIN hit or peer got scheduled).
 
462 -
        if (ctx->inline_budget == 0)
 
463 -
            ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
 
464 -
        else if (ctx->inline_budget < ctx->inline_budget_max)
 
465 -
            ctx->inline_budget_max = 2;
 
466 -
        ctx->inline_budget = ctx->inline_budget_max;
 
467 -
    }
 
468 -
}
 
469 -

 
470 -
inline bool
 
471 -
epoll_scheduler::try_consume_inline_budget() const noexcept
 
472 -
{
 
473 -
    if (auto* ctx = epoll::find_context(this))
 
474 -
    {
 
475 -
        if (ctx->inline_budget > 0)
 
476 -
        {
 
477 -
            --ctx->inline_budget;
 
478 -
            return true;
 
479 -
        }
 
480 -
    }
 
481 -
    return false;
 
482 -
}
 
483 -

 
484 -
inline void
 
485 -
descriptor_state::operator()()
 
486 -
{
 
487 -
    is_enqueued_.store(false, std::memory_order_relaxed);
 
488 -

 
489 -
    // Take ownership of impl ref set by close_socket() to prevent
 
490 -
    // the owning impl from being freed while we're executing
 
491 -
    auto prevent_impl_destruction = std::move(impl_ref_);
 
492 -

 
493 -
    std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
 
494 -
    if (ev == 0)
 
495 -
    {
 
496 -
        scheduler_->compensating_work_started();
 
497 -
        return;
 
498 -
    }
 
499 -

 
500 -
    op_queue local_ops;
 
501 -

 
502 -
    int err = 0;
 
503 -
    if (ev & EPOLLERR)
 
504 -
    {
 
505 -
        socklen_t len = sizeof(err);
 
506 -
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
507 -
            err = errno;
 
508 -
        if (err == 0)
 
509 -
            err = EIO;
 
510 -
    }
 
511 -

 
512 -
    {
 
513 -
        std::lock_guard lock(mutex);
 
514 -
        if (ev & EPOLLIN)
 
515 -
        {
 
516 -
            if (read_op)
 
517 -
            {
 
518 -
                auto* rd = read_op;
 
519 -
                if (err)
 
520 -
                    rd->complete(err, 0);
 
521 -
                else
 
522 -
                    rd->perform_io();
 
523 -

 
524 -
                if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
 
525 -
                {
 
526 -
                    rd->errn = 0;
 
527 -
                }
 
528 -
                else
 
529 -
                {
 
530 -
                    read_op = nullptr;
 
531 -
                    local_ops.push(rd);
 
532 -
                }
 
533 -
            }
 
534 -
            else
 
535 -
            {
 
536 -
                read_ready = true;
 
537 -
            }
 
538 -
        }
 
539 -
        if (ev & EPOLLOUT)
 
540 -
        {
 
541 -
            bool had_write_op = (connect_op || write_op);
 
542 -
            if (connect_op)
 
543 -
            {
 
544 -
                auto* cn = connect_op;
 
545 -
                if (err)
 
546 -
                    cn->complete(err, 0);
 
547 -
                else
 
548 -
                    cn->perform_io();
 
549 -
                connect_op = nullptr;
 
550 -
                local_ops.push(cn);
 
551 -
            }
 
552 -
            if (write_op)
 
553 -
            {
 
554 -
                auto* wr = write_op;
 
555 -
                if (err)
 
556 -
                    wr->complete(err, 0);
 
557 -
                else
 
558 -
                    wr->perform_io();
 
559 -

 
560 -
                if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
 
561 -
                {
 
562 -
                    wr->errn = 0;
 
563 -
                }
 
564 -
                else
 
565 -
                {
 
566 -
                    write_op = nullptr;
 
567 -
                    local_ops.push(wr);
 
568 -
                }
 
569 -
            }
 
570 -
            if (!had_write_op)
 
571 -
                write_ready = true;
 
572 -
        }
 
573 -
        if (err)
 
574 -
        {
 
575 -
            if (read_op)
 
576 -
            {
 
577 -
                read_op->complete(err, 0);
 
578 -
                local_ops.push(std::exchange(read_op, nullptr));
 
579 -
            }
 
580 -
            if (write_op)
 
581 -
            {
 
582 -
                write_op->complete(err, 0);
 
583 -
                local_ops.push(std::exchange(write_op, nullptr));
 
584 -
            }
 
585 -
            if (connect_op)
 
586 -
            {
 
587 -
                connect_op->complete(err, 0);
 
588 -
                local_ops.push(std::exchange(connect_op, nullptr));
 
589 -
            }
 
590 -
        }
 
591 -
    }
 
592 -

 
593 -
    // Execute first handler inline — the scheduler's work_cleanup
 
594 -
    // accounts for this as the "consumed" work item
 
595 -
    scheduler_op* first = local_ops.pop();
 
596 -
    if (first)
 
597 -
    {
 
598 -
        scheduler_->post_deferred_completions(local_ops);
 
599 -
        (*first)();
 
600 -
    }
 
601 -
    else
 
602 -
    {
 
603 -
        scheduler_->compensating_work_started();
 
604 -
    }
 
605 -
}
 
606 -

 
607  

132  

608  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
133  
inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
609  
    : epoll_fd_(-1)
134  
    : epoll_fd_(-1)
610  
    , event_fd_(-1)
135  
    , event_fd_(-1)
611 -
    , outstanding_work_(0)
 
612 -
    , stopped_(false)
 
613 -
    , task_running_{false}
 
614 -
    , task_interrupted_(false)
 
615 -
    , state_(0)
 
616  
    , timer_fd_(-1)
136  
    , timer_fd_(-1)
617  
{
137  
{
618  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
138  
    epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
619  
    if (epoll_fd_ < 0)
139  
    if (epoll_fd_ < 0)
620  
        detail::throw_system_error(make_err(errno), "epoll_create1");
140  
        detail::throw_system_error(make_err(errno), "epoll_create1");
621  

141  

622  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
142  
    event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
623  
    if (event_fd_ < 0)
143  
    if (event_fd_ < 0)
624  
    {
144  
    {
625  
        int errn = errno;
145  
        int errn = errno;
626  
        ::close(epoll_fd_);
146  
        ::close(epoll_fd_);
627  
        detail::throw_system_error(make_err(errn), "eventfd");
147  
        detail::throw_system_error(make_err(errn), "eventfd");
628  
    }
148  
    }
629  

149  

630  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
150  
    timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
631  
    if (timer_fd_ < 0)
151  
    if (timer_fd_ < 0)
632  
    {
152  
    {
633  
        int errn = errno;
153  
        int errn = errno;
634  
        ::close(event_fd_);
154  
        ::close(event_fd_);
635  
        ::close(epoll_fd_);
155  
        ::close(epoll_fd_);
636  
        detail::throw_system_error(make_err(errn), "timerfd_create");
156  
        detail::throw_system_error(make_err(errn), "timerfd_create");
637  
    }
157  
    }
638  

158  

639  
    epoll_event ev{};
159  
    epoll_event ev{};
640  
    ev.events   = EPOLLIN | EPOLLET;
160  
    ev.events   = EPOLLIN | EPOLLET;
641  
    ev.data.ptr = nullptr;
161  
    ev.data.ptr = nullptr;
642  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
162  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
643  
    {
163  
    {
644  
        int errn = errno;
164  
        int errn = errno;
645  
        ::close(timer_fd_);
165  
        ::close(timer_fd_);
646  
        ::close(event_fd_);
166  
        ::close(event_fd_);
647  
        ::close(epoll_fd_);
167  
        ::close(epoll_fd_);
648  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
168  
        detail::throw_system_error(make_err(errn), "epoll_ctl");
649  
    }
169  
    }
650  

170  

651  
    epoll_event timer_ev{};
171  
    epoll_event timer_ev{};
652  
    timer_ev.events   = EPOLLIN | EPOLLERR;
172  
    timer_ev.events   = EPOLLIN | EPOLLERR;
653  
    timer_ev.data.ptr = &timer_fd_;
173  
    timer_ev.data.ptr = &timer_fd_;
654  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
174  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
655  
    {
175  
    {
656  
        int errn = errno;
176  
        int errn = errno;
657  
        ::close(timer_fd_);
177  
        ::close(timer_fd_);
658  
        ::close(event_fd_);
178  
        ::close(event_fd_);
659  
        ::close(epoll_fd_);
179  
        ::close(epoll_fd_);
660  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
180  
        detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
661  
    }
181  
    }
662  

182  

663  
    timer_svc_ = &get_timer_service(ctx, *this);
183  
    timer_svc_ = &get_timer_service(ctx, *this);
664  
    timer_svc_->set_on_earliest_changed(
184  
    timer_svc_->set_on_earliest_changed(
665  
        timer_service::callback(this, [](void* p) {
185  
        timer_service::callback(this, [](void* p) {
666  
            auto* self = static_cast<epoll_scheduler*>(p);
186  
            auto* self = static_cast<epoll_scheduler*>(p);
667  
            self->timerfd_stale_.store(true, std::memory_order_release);
187  
            self->timerfd_stale_.store(true, std::memory_order_release);
668 -
            if (self->task_running_.load(std::memory_order_acquire))
188 +
            self->interrupt_reactor();
669 -
                self->interrupt_reactor();
 
670  
        }));
189  
        }));
671 -
    // Initialize resolver service
 
672  

190  

673 -

 
674 -
    // Initialize signal service
 
675  
    get_resolver_service(ctx, *this);
191  
    get_resolver_service(ctx, *this);
676  
    get_signal_service(ctx, *this);
192  
    get_signal_service(ctx, *this);
677 -
    // Push task sentinel to interleave reactor runs with handler execution
 
678  

193  

679  
    completed_ops_.push(&task_op_);
194  
    completed_ops_.push(&task_op_);
680  
}
195  
}
681  

196  

682  
inline epoll_scheduler::~epoll_scheduler()
197  
inline epoll_scheduler::~epoll_scheduler()
683  
{
198  
{
684  
    if (timer_fd_ >= 0)
199  
    if (timer_fd_ >= 0)
685  
        ::close(timer_fd_);
200  
        ::close(timer_fd_);
686  
    if (event_fd_ >= 0)
201  
    if (event_fd_ >= 0)
687  
        ::close(event_fd_);
202  
        ::close(event_fd_);
688  
    if (epoll_fd_ >= 0)
203  
    if (epoll_fd_ >= 0)
689  
        ::close(epoll_fd_);
204  
        ::close(epoll_fd_);
690  
}
205  
}
691  

206  

692  
inline void
207  
inline void
693  
epoll_scheduler::shutdown()
208  
epoll_scheduler::shutdown()
694  
{
209  
{
695 -
    {
210 +
    shutdown_drain();
696 -
        std::unique_lock lock(mutex_);
 
697 -

 
698 -
        while (auto* h = completed_ops_.pop())
 
699 -
        {
 
700 -
            if (h == &task_op_)
 
701 -
                continue;
 
702 -
            lock.unlock();
 
703 -
            h->destroy();
 
704 -
            lock.lock();
 
705 -
        }
 
706 -

 
707 -
        signal_all(lock);
 
708 -
    }
 
709  

211  

710  
    if (event_fd_ >= 0)
212  
    if (event_fd_ >= 0)
711  
        interrupt_reactor();
213  
        interrupt_reactor();
712  
}
214  
}
713  

215  

714 -
epoll_scheduler::post(std::coroutine_handle<> h) const
 
715 -
{
 
716 -
    struct post_handler final : scheduler_op
 
717 -
    {
 
718 -
        std::coroutine_handle<> h_;
 
719 -

 
720 -
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
 
721 -

 
722 -
        ~post_handler() override = default;
 
723 -

 
724 -
        void operator()() override
 
725 -
        {
 
726 -
            auto h = h_;
 
727 -
            delete this;
 
728 -
            h.resume();
 
729 -
        }
 
730 -

 
731 -
        void destroy() override
 
732 -
        {
 
733 -
            auto h = h_;
 
734 -
            delete this;
 
735 -
            h.destroy();
 
736 -
        }
 
737 -
    };
 
738 -

 
739 -
    auto ph = std::make_unique<post_handler>(h);
 
740 -

 
741 -
    // Fast path: same thread posts to private queue
 
742 -
    // Only count locally; work_cleanup batches to global counter
 
743 -
    if (auto* ctx = epoll::find_context(this))
 
744 -
    {
 
745 -
        ++ctx->private_outstanding_work;
 
746 -
        ctx->private_queue.push(ph.release());
 
747 -
        return;
 
748 -
    }
 
749 -

 
750 -
    // Slow path: cross-thread post requires mutex
 
751 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
752 -

 
753 -
    std::unique_lock lock(mutex_);
 
754 -
    completed_ops_.push(ph.release());
 
755 -
    wake_one_thread_and_unlock(lock);
 
756 -
}
 
757 -

 
758 -
inline void
 
759 -
epoll_scheduler::post(scheduler_op* h) const
 
760 -
{
 
761 -
    // Fast path: same thread posts to private queue
 
762 -
    // Only count locally; work_cleanup batches to global counter
 
763 -
    if (auto* ctx = epoll::find_context(this))
 
764 -
    {
 
765 -
        ++ctx->private_outstanding_work;
 
766 -
        ctx->private_queue.push(h);
 
767 -
        return;
 
768 -
    }
 
769 -

 
770 -
    // Slow path: cross-thread post requires mutex
 
771 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
772 -

 
773 -
    std::unique_lock lock(mutex_);
 
774 -
    completed_ops_.push(h);
 
775 -
    wake_one_thread_and_unlock(lock);
 
776 -
}
 
777 -

 
778 -
inline bool
 
779 -
epoll_scheduler::running_in_this_thread() const noexcept
 
780 -
{
 
781 -
    for (auto* c = epoll::context_stack.get(); c != nullptr; c = c->next)
 
782 -
        if (c->key == this)
 
783 -
            return true;
 
784 -
    return false;
 
785 -
}
 
786 -

 
787 -
inline void
 
788 -
epoll_scheduler::stop()
 
789 -
{
 
790 -
    std::unique_lock lock(mutex_);
 
791 -
    if (!stopped_)
 
792 -
    {
 
793 -
        stopped_ = true;
 
794 -
        signal_all(lock);
 
795 -
        interrupt_reactor();
 
796 -
    }
 
797 -
}
 
798 -

 
799 -
inline bool
 
800 -
epoll_scheduler::stopped() const noexcept
 
801 -
{
 
802 -
    std::unique_lock lock(mutex_);
 
803 -
    return stopped_;
 
804 -
}
 
805 -

 
806 -
inline void
 
807 -
epoll_scheduler::restart()
 
808 -
{
 
809 -
    std::unique_lock lock(mutex_);
 
810 -
    stopped_ = false;
 
811 -
}
 
812 -

 
813 -
inline std::size_t
 
814 -
epoll_scheduler::run()
 
815 -
{
 
816 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
817 -
    {
 
818 -
        stop();
 
819 -
        return 0;
 
820 -
    }
 
821 -

 
822 -
    epoll::thread_context_guard ctx(this);
 
823 -
    std::unique_lock lock(mutex_);
 
824 -

 
825 -
    std::size_t n = 0;
 
826 -
    for (;;)
 
827 -
    {
 
828 -
        if (!do_one(lock, -1, &ctx.frame_))
 
829 -
            break;
 
830 -
        if (n != (std::numeric_limits<std::size_t>::max)())
 
831 -
            ++n;
 
832 -
        if (!lock.owns_lock())
 
833 -
            lock.lock();
 
834 -
    }
 
835 -
    return n;
 
836 -
}
 
837 -

 
838 -
inline std::size_t
 
839 -
epoll_scheduler::run_one()
 
840 -
{
 
841 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
842 -
    {
 
843 -
        stop();
 
844 -
        return 0;
 
845 -
    }
 
846 -

 
847 -
    epoll::thread_context_guard ctx(this);
 
848 -
    std::unique_lock lock(mutex_);
 
849 -
    return do_one(lock, -1, &ctx.frame_);
 
850 -
}
 
851 -

 
852 -
inline std::size_t
 
853 -
epoll_scheduler::wait_one(long usec)
 
854 -
{
 
855 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
856 -
    {
 
857 -
        stop();
 
858 -
        return 0;
 
859 -
    }
 
860 -

 
861 -
    epoll::thread_context_guard ctx(this);
 
862 -
    std::unique_lock lock(mutex_);
 
863 -
    return do_one(lock, usec, &ctx.frame_);
 
864 -
}
 
865 -

 
866 -
inline std::size_t
 
867 -
epoll_scheduler::poll()
 
868 -
{
 
869 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
870 -
    {
 
871 -
        stop();
 
872 -
        return 0;
 
873 -
    }
 
874 -

 
875 -
    epoll::thread_context_guard ctx(this);
 
876 -
    std::unique_lock lock(mutex_);
 
877 -

 
878 -
    std::size_t n = 0;
 
879 -
    for (;;)
 
880 -
    {
 
881 -
        if (!do_one(lock, 0, &ctx.frame_))
 
882 -
            break;
 
883 -
        if (n != (std::numeric_limits<std::size_t>::max)())
 
884 -
            ++n;
 
885 -
        if (!lock.owns_lock())
 
886 -
            lock.lock();
 
887 -
    }
 
888 -
    return n;
 
889 -
}
 
890 -

 
891 -
inline std::size_t
 
892 -
epoll_scheduler::poll_one()
 
893 -
{
 
894 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
895 -
    {
 
896 -
        stop();
 
897 -
        return 0;
 
898 -
    }
 
899 -

 
900 -
    epoll::thread_context_guard ctx(this);
 
901 -
    std::unique_lock lock(mutex_);
 
902 -
    return do_one(lock, 0, &ctx.frame_);
 
903 -
}
 
904 -

 
905 -
inline void
 
906  
inline void
216  
inline void
907  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
217  
epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
908  
{
218  
{
909  
    epoll_event ev{};
219  
    epoll_event ev{};
910  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
220  
    ev.events   = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
911  
    ev.data.ptr = desc;
221  
    ev.data.ptr = desc;
912  

222  

913  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
223  
    if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
914  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
224  
        detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
915  

225  

916  
    desc->registered_events = ev.events;
226  
    desc->registered_events = ev.events;
917  
    desc->fd                = fd;
227  
    desc->fd                = fd;
918  
    desc->scheduler_        = this;
228  
    desc->scheduler_        = this;
 
229 +
    desc->ready_events_.store(0, std::memory_order_relaxed);
919  

230  

920  
    std::lock_guard lock(desc->mutex);
231  
    std::lock_guard lock(desc->mutex);
 
232 +
    desc->impl_ref_.reset();
921  
    desc->read_ready  = false;
233  
    desc->read_ready  = false;
922  
    desc->write_ready = false;
234  
    desc->write_ready = false;
923  
}
235  
}
924  

236  

925  
inline void
237  
inline void
926  
epoll_scheduler::deregister_descriptor(int fd) const
238  
epoll_scheduler::deregister_descriptor(int fd) const
927  
{
239  
{
928  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
240  
    ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
929  
}
241  
}
930  

242  

931 -
epoll_scheduler::work_started() noexcept
 
932 -
{
 
933 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
934 -
}
 
935 -

 
936 -
inline void
 
937 -
epoll_scheduler::work_finished() noexcept
 
938 -
{
 
939 -
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
 
940 -
        stop();
 
941 -
}
 
942 -

 
943 -
inline void
 
944 -
epoll_scheduler::compensating_work_started() const noexcept
 
945 -
{
 
946 -
    auto* ctx = epoll::find_context(this);
 
947 -
    if (ctx)
 
948 -
        ++ctx->private_outstanding_work;
 
949 -
}
 
950 -

 
951 -
inline void
 
952 -
epoll_scheduler::drain_thread_queue(op_queue& queue, long count) const
 
953 -
{
 
954 -
    // Note: outstanding_work_ was already incremented when posting
 
955 -
    std::unique_lock lock(mutex_);
 
956 -
    completed_ops_.splice(queue);
 
957 -
    if (count > 0)
 
958 -
        maybe_unlock_and_signal_one(lock);
 
959 -
}
 
960 -

 
961 -
inline void
 
962 -
epoll_scheduler::post_deferred_completions(op_queue& ops) const
 
963 -
{
 
964 -
    if (ops.empty())
 
965 -
        return;
 
966 -

 
967 -
    // Fast path: if on scheduler thread, use private queue
 
968 -
    if (auto* ctx = epoll::find_context(this))
 
969 -
    {
 
970 -
        ctx->private_queue.splice(ops);
 
971 -
        return;
 
972 -
    }
 
973 -

 
974 -
    // Slow path: add to global queue and wake a thread
 
975 -
    std::unique_lock lock(mutex_);
 
976 -
    completed_ops_.splice(ops);
 
977 -
    wake_one_thread_and_unlock(lock);
 
978 -
}
 
979 -

 
980 -
inline void
 
981  
inline void
243  
inline void
982  
epoll_scheduler::interrupt_reactor() const
244  
epoll_scheduler::interrupt_reactor() const
983 -
    // Only write if not already armed to avoid redundant writes
 
984  
{
245  
{
985  
    bool expected = false;
246  
    bool expected = false;
986  
    if (eventfd_armed_.compare_exchange_strong(
247  
    if (eventfd_armed_.compare_exchange_strong(
987  
            expected, true, std::memory_order_release,
248  
            expected, true, std::memory_order_release,
988  
            std::memory_order_relaxed))
249  
            std::memory_order_relaxed))
989  
    {
250  
    {
990  
        std::uint64_t val       = 1;
251  
        std::uint64_t val       = 1;
991  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
252  
        [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
992  
    }
253  
    }
993  
}
254  
}
994  

255  

995 -
epoll_scheduler::signal_all(std::unique_lock<std::mutex>&) const
 
996 -
{
 
997 -
    state_ |= 1;
 
998 -
    cond_.notify_all();
 
999 -
}
 
1000 -

 
1001 -
inline bool
 
1002 -
epoll_scheduler::maybe_unlock_and_signal_one(
 
1003 -
    std::unique_lock<std::mutex>& lock) const
 
1004 -
{
 
1005 -
    state_ |= 1;
 
1006 -
    if (state_ > 1)
 
1007 -
    {
 
1008 -
        lock.unlock();
 
1009 -
        cond_.notify_one();
 
1010 -
        return true;
 
1011 -
    }
 
1012 -
    return false;
 
1013 -
}
 
1014 -

 
1015 -
inline bool
 
1016 -
epoll_scheduler::unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
 
1017 -
{
 
1018 -
    state_ |= 1;
 
1019 -
    bool have_waiters = state_ > 1;
 
1020 -
    lock.unlock();
 
1021 -
    if (have_waiters)
 
1022 -
        cond_.notify_one();
 
1023 -
    return have_waiters;
 
1024 -
}
 
1025 -

 
1026 -
inline void
 
1027 -
epoll_scheduler::clear_signal() const
 
1028 -
{
 
1029 -
    state_ &= ~std::size_t(1);
 
1030 -
}
 
1031 -

 
1032 -
inline void
 
1033 -
epoll_scheduler::wait_for_signal(std::unique_lock<std::mutex>& lock) const
 
1034 -
{
 
1035 -
    while ((state_ & 1) == 0)
 
1036 -
    {
 
1037 -
        state_ += 2;
 
1038 -
        cond_.wait(lock);
 
1039 -
        state_ -= 2;
 
1040 -
    }
 
1041 -
}
 
1042 -

 
1043 -
inline void
 
1044 -
epoll_scheduler::wait_for_signal_for(
 
1045 -
    std::unique_lock<std::mutex>& lock, long timeout_us) const
 
1046 -
{
 
1047 -
    if ((state_ & 1) == 0)
 
1048 -
    {
 
1049 -
        state_ += 2;
 
1050 -
        cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
 
1051 -
        state_ -= 2;
 
1052 -
    }
 
1053 -
}
 
1054 -

 
1055 -
inline void
 
1056 -
epoll_scheduler::wake_one_thread_and_unlock(
 
1057 -
    std::unique_lock<std::mutex>& lock) const
 
1058 -
{
 
1059 -
    if (maybe_unlock_and_signal_one(lock))
 
1060 -
        return;
 
1061 -

 
1062 -
    if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
 
1063 -
    {
 
1064 -
        task_interrupted_ = true;
 
1065 -
        lock.unlock();
 
1066 -
        interrupt_reactor();
 
1067 -
    }
 
1068 -
    else
 
1069 -
    {
 
1070 -
        lock.unlock();
 
1071 -
    }
 
1072 -
}
 
1073 -

 
1074 -
inline epoll_scheduler::work_cleanup::~work_cleanup()
 
1075 -
{
 
1076 -
    if (ctx)
 
1077 -
    {
 
1078 -
        long produced = ctx->private_outstanding_work;
 
1079 -
        if (produced > 1)
 
1080 -
            scheduler->outstanding_work_.fetch_add(
 
1081 -
                produced - 1, std::memory_order_relaxed);
 
1082 -
        else if (produced < 1)
 
1083 -
            scheduler->work_finished();
 
1084 -
        ctx->private_outstanding_work = 0;
 
1085 -

 
1086 -
        if (!ctx->private_queue.empty())
 
1087 -
        {
 
1088 -
            lock->lock();
 
1089 -
            scheduler->completed_ops_.splice(ctx->private_queue);
 
1090 -
        }
 
1091 -
    }
 
1092 -
    else
 
1093 -
    {
 
1094 -
        scheduler->work_finished();
 
1095 -
    }
 
1096 -
}
 
1097 -

 
1098 -
inline epoll_scheduler::task_cleanup::~task_cleanup()
 
1099 -
{
 
1100 -
    if (!ctx)
 
1101 -
        return;
 
1102 -

 
1103 -
    if (ctx->private_outstanding_work > 0)
 
1104 -
    {
 
1105 -
        scheduler->outstanding_work_.fetch_add(
 
1106 -
            ctx->private_outstanding_work, std::memory_order_relaxed);
 
1107 -
        ctx->private_outstanding_work = 0;
 
1108 -
    }
 
1109 -

 
1110 -
    if (!ctx->private_queue.empty())
 
1111 -
    {
 
1112 -
        if (!lock->owns_lock())
 
1113 -
            lock->lock();
 
1114 -
        scheduler->completed_ops_.splice(ctx->private_queue);
 
1115 -
    }
 
1116 -
}
 
1117 -

 
1118 -
inline void
 
1119  
inline void
256  
inline void
1120  
epoll_scheduler::update_timerfd() const
257  
epoll_scheduler::update_timerfd() const
1121  
{
258  
{
1122  
    auto nearest = timer_svc_->nearest_expiry();
259  
    auto nearest = timer_svc_->nearest_expiry();
1123  

260  

1124  
    itimerspec ts{};
261  
    itimerspec ts{};
1125  
    int flags = 0;
262  
    int flags = 0;
1126  

263  

1127  
    if (nearest == timer_service::time_point::max())
264  
    if (nearest == timer_service::time_point::max())
1128  
    {
265  
    {
1129 -
        // No timers - disarm by setting to 0 (relative)
266 +
        // No timers — disarm by setting to 0 (relative)
1130  
    }
267  
    }
1131  
    else
268  
    else
1132  
    {
269  
    {
1133  
        auto now = std::chrono::steady_clock::now();
270  
        auto now = std::chrono::steady_clock::now();
1134  
        if (nearest <= now)
271  
        if (nearest <= now)
1135  
        {
272  
        {
1136 -
            // Use 1ns instead of 0 - zero disarms the timerfd
273 +
            // Use 1ns instead of 0 — zero disarms the timerfd
1137  
            ts.it_value.tv_nsec = 1;
274  
            ts.it_value.tv_nsec = 1;
1138  
        }
275  
        }
1139  
        else
276  
        else
1140  
        {
277  
        {
1141  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
278  
            auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
1142  
                            nearest - now)
279  
                            nearest - now)
1143  
                            .count();
280  
                            .count();
1144  
            ts.it_value.tv_sec  = nsec / 1000000000;
281  
            ts.it_value.tv_sec  = nsec / 1000000000;
1145 -
            // Ensure non-zero to avoid disarming if duration rounds to 0
 
1146  
            ts.it_value.tv_nsec = nsec % 1000000000;
282  
            ts.it_value.tv_nsec = nsec % 1000000000;
1147  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
283  
            if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
1148  
                ts.it_value.tv_nsec = 1;
284  
                ts.it_value.tv_nsec = 1;
1149  
        }
285  
        }
1150  
    }
286  
    }
1151  

287  

1152  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
288  
    if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
1153  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
289  
        detail::throw_system_error(make_err(errno), "timerfd_settime");
1154  
}
290  
}
1155  

291  

1156  
inline void
292  
inline void
1157 -
epoll_scheduler::run_task(
293 +
epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
1158 -
    std::unique_lock<std::mutex>& lock, epoll::scheduler_context* ctx)
 
1159  
{
294  
{
1160  
    int timeout_ms = task_interrupted_ ? 0 : -1;
295  
    int timeout_ms = task_interrupted_ ? 0 : -1;
1161  

296  

1162  
    if (lock.owns_lock())
297  
    if (lock.owns_lock())
1163  
        lock.unlock();
298  
        lock.unlock();
1164  

299  

1165  
    task_cleanup on_exit{this, &lock, ctx};
300  
    task_cleanup on_exit{this, &lock, ctx};
1166  

301  

1167  
    // Flush deferred timerfd programming before blocking
302  
    // Flush deferred timerfd programming before blocking
1168  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
303  
    if (timerfd_stale_.exchange(false, std::memory_order_acquire))
1169  
        update_timerfd();
304  
        update_timerfd();
1170 -
    // Event loop runs without mutex held
 
1171  

305  

1172  
    epoll_event events[128];
306  
    epoll_event events[128];
1173  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
307  
    int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
1174  

308  

1175  
    if (nfds < 0 && errno != EINTR)
309  
    if (nfds < 0 && errno != EINTR)
1176  
        detail::throw_system_error(make_err(errno), "epoll_wait");
310  
        detail::throw_system_error(make_err(errno), "epoll_wait");
1177  

311  

1178  
    bool check_timers = false;
312  
    bool check_timers = false;
1179  
    op_queue local_ops;
313  
    op_queue local_ops;
1180 -
    // Process events without holding the mutex
 
1181  

314  

1182  
    for (int i = 0; i < nfds; ++i)
315  
    for (int i = 0; i < nfds; ++i)
1183  
    {
316  
    {
1184  
        if (events[i].data.ptr == nullptr)
317  
        if (events[i].data.ptr == nullptr)
1185  
        {
318  
        {
1186 -
            // Mutex released above; analyzer can't track unlock via ref
 
1187  
            std::uint64_t val;
319  
            std::uint64_t val;
1188  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
320  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1189  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
321  
            [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
1190  
            eventfd_armed_.store(false, std::memory_order_relaxed);
322  
            eventfd_armed_.store(false, std::memory_order_relaxed);
1191  
            continue;
323  
            continue;
1192  
        }
324  
        }
1193  

325  

1194  
        if (events[i].data.ptr == &timer_fd_)
326  
        if (events[i].data.ptr == &timer_fd_)
1195  
        {
327  
        {
1196  
            std::uint64_t expirations;
328  
            std::uint64_t expirations;
1197  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
329  
            // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
1198  
            [[maybe_unused]] auto r =
330  
            [[maybe_unused]] auto r =
1199  
                ::read(timer_fd_, &expirations, sizeof(expirations));
331  
                ::read(timer_fd_, &expirations, sizeof(expirations));
1200  
            check_timers = true;
332  
            check_timers = true;
1201  
            continue;
333  
            continue;
1202  
        }
334  
        }
1203 -
        // Deferred I/O: just set ready events and enqueue descriptor
 
1204 -
        // No per-descriptor mutex locking in reactor hot path!
 
1205  

335  

1206  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
336  
        auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
1207  
        desc->add_ready_events(events[i].events);
337  
        desc->add_ready_events(events[i].events);
1208 -
        // Only enqueue if not already enqueued
 
1209  

338  

1210  
        bool expected = false;
339  
        bool expected = false;
1211  
        if (desc->is_enqueued_.compare_exchange_strong(
340  
        if (desc->is_enqueued_.compare_exchange_strong(
1212  
                expected, true, std::memory_order_release,
341  
                expected, true, std::memory_order_release,
1213  
                std::memory_order_relaxed))
342  
                std::memory_order_relaxed))
1214  
        {
343  
        {
1215  
            local_ops.push(desc);
344  
            local_ops.push(desc);
1216  
        }
345  
        }
1217  
    }
346  
    }
1218 -
    // Process timers only when timerfd fires
 
1219  

347  

1220  
    if (check_timers)
348  
    if (check_timers)
1221  
    {
349  
    {
1222  
        timer_svc_->process_expired();
350  
        timer_svc_->process_expired();
1223  
        update_timerfd();
351  
        update_timerfd();
1224  
    }
352  
    }
1225  

353  

1226  
    lock.lock();
354  
    lock.lock();
1227  

355  

1228  
    if (!local_ops.empty())
356  
    if (!local_ops.empty())
1229 -
}
 
1230 -

 
1231 -
inline std::size_t
 
1232 -
epoll_scheduler::do_one(
 
1233 -
    std::unique_lock<std::mutex>& lock,
 
1234 -
    long timeout_us,
 
1235 -
    epoll::scheduler_context* ctx)
 
1236 -
{
 
1237 -
    for (;;)
 
1238 -
    {
 
1239 -
        if (stopped_)
 
1240 -
            return 0;
 
1241 -

 
1242 -
        scheduler_op* op = completed_ops_.pop();
 
1243 -

 
1244 -
        // Handle reactor sentinel - time to poll for I/O
 
1245 -
        if (op == &task_op_)
 
1246 -
        {
 
1247 -
            bool more_handlers = !completed_ops_.empty();
 
1248 -

 
1249 -
            // Nothing to run the reactor for: no pending work to wait on,
 
1250 -
            // or caller requested a non-blocking poll
 
1251 -
            if (!more_handlers &&
 
1252 -
                (outstanding_work_.load(std::memory_order_acquire) == 0 ||
 
1253 -
                 timeout_us == 0))
 
1254 -
            {
 
1255 -
                completed_ops_.push(&task_op_);
 
1256 -
                return 0;
 
1257 -
            }
 
1258 -

 
1259 -
            task_interrupted_ = more_handlers || timeout_us == 0;
 
1260 -
            task_running_.store(true, std::memory_order_release);
 
1261 -

 
1262 -
            if (more_handlers)
 
1263 -
                unlock_and_signal_one(lock);
 
1264 -

 
1265 -
            run_task(lock, ctx);
 
1266 -

 
1267 -
            task_running_.store(false, std::memory_order_relaxed);
 
1268 -
            completed_ops_.push(&task_op_);
 
1269 -
            continue;
 
1270 -
        }
 
1271 -

 
1272 -
        // Handle operation
 
1273 -
        if (op != nullptr)
 
1274 -
        {
 
1275 -
            bool more = !completed_ops_.empty();
 
1276 -

 
1277 -
            if (more)
 
1278 -
                ctx->unassisted = !unlock_and_signal_one(lock);
 
1279 -
            else
 
1280 -
            {
 
1281 -
                ctx->unassisted = false;
 
1282 -
                lock.unlock();
 
1283 -
            }
 
1284 -

 
1285 -
            work_cleanup on_exit{this, &lock, ctx};
 
1286 -

 
1287 -
            (*op)();
 
1288 -
            return 1;
 
1289 -
        }
 
1290 -

 
1291 -
        // No pending work to wait on, or caller requested non-blocking poll
 
1292 -
        if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
 
1293 -
            timeout_us == 0)
 
1294 -
            return 0;
 
1295 -

 
1296 -
        clear_signal();
 
1297 -
        if (timeout_us < 0)
 
1298 -
            wait_for_signal(lock);
 
1299 -
        else
 
1300 -
            wait_for_signal_for(lock, timeout_us);
 
1301 -
    }
 
1302  
        completed_ops_.splice(local_ops);
357  
        completed_ops_.splice(local_ops);
1303  
}
358  
}
1304  

359  

1305  
} // namespace boost::corosio::detail
360  
} // namespace boost::corosio::detail
1306  

361  

1307  
#endif // BOOST_COROSIO_HAS_EPOLL
362  
#endif // BOOST_COROSIO_HAS_EPOLL
1308  

363  

1309  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
364  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP