1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  

19  

20 -
#include <boost/corosio/native/native_scheduler.hpp>
20 +
#include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 -
#include <boost/corosio/detail/scheduler_op.hpp>
 
22  

21  

23  
#include <boost/corosio/native/detail/select/select_op.hpp>
22  
#include <boost/corosio/native/detail/select/select_op.hpp>
24  
#include <boost/corosio/detail/timer_service.hpp>
23  
#include <boost/corosio/detail/timer_service.hpp>
25  
#include <boost/corosio/native/detail/make_err.hpp>
24  
#include <boost/corosio/native/detail/make_err.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
25  
#include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
27  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
26  
#include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
28  

27  

29 -
#include <boost/corosio/detail/thread_local_ptr.hpp>
 
30  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
31  

29  

32 -
#include <sys/socket.h>
 
33  
#include <sys/select.h>
30  
#include <sys/select.h>
34  
#include <unistd.h>
31  
#include <unistd.h>
35  
#include <errno.h>
32  
#include <errno.h>
36  
#include <fcntl.h>
33  
#include <fcntl.h>
37 -
#include <algorithm>
 
38  

34  

39  
#include <atomic>
35  
#include <atomic>
40  
#include <chrono>
36  
#include <chrono>
41 -
#include <condition_variable>
37 +
#include <cstdint>
42 -
#include <cstddef>
 
43  
#include <limits>
38  
#include <limits>
44  
#include <mutex>
39  
#include <mutex>
45  
#include <unordered_map>
40  
#include <unordered_map>
46  

41  

47  
namespace boost::corosio::detail {
42  
namespace boost::corosio::detail {
48  

43  

49  
struct select_op;
44  
struct select_op;
 
45 +
struct select_descriptor_state;
50  

46  

51  
/** POSIX scheduler using select() for I/O multiplexing.
47  
/** POSIX scheduler using select() for I/O multiplexing.
52  

48  

53  
    This scheduler implements the scheduler interface using the POSIX select()
49  
    This scheduler implements the scheduler interface using the POSIX select()
54 -
    call for I/O event notification. It uses a single reactor model
50 +
    call for I/O event notification. It inherits the shared reactor threading
55 -
    where one thread runs select() while other threads wait on a condition
51 +
    model from reactor_scheduler_base: signal state machine, inline completion
56 -
    variable for handler work. This design provides:
52 +
    budget, work counting, and the do_one event loop.
57 -

 
58 -
    - Handler parallelism: N posted handlers can execute on N threads
 
59 -
    - No thundering herd: condition_variable wakes exactly one thread
 
60 -
    - Portability: Works on all POSIX systems
 
61  

53  

62  
    The design mirrors epoll_scheduler for behavioral consistency:
54  
    The design mirrors epoll_scheduler for behavioral consistency:
63  
    - Same single-reactor thread coordination model
55  
    - Same single-reactor thread coordination model
64 -
    - Same work counting semantics
56 +
    - Same deferred I/O pattern (reactor marks ready; workers do I/O)
65  
    - Same timer integration pattern
57  
    - Same timer integration pattern
66  

58  

67  
    Known Limitations:
59  
    Known Limitations:
68  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
60  
    - FD_SETSIZE (~1024) limits maximum concurrent connections
69  
    - O(n) scanning: rebuilds fd_sets each iteration
61  
    - O(n) scanning: rebuilds fd_sets each iteration
70  
    - Level-triggered only (no edge-triggered mode)
62  
    - Level-triggered only (no edge-triggered mode)
71  

63  

72  
    @par Thread Safety
64  
    @par Thread Safety
73  
    All public member functions are thread-safe.
65  
    All public member functions are thread-safe.
74  
*/
66  
*/
75 -
class BOOST_COROSIO_DECL select_scheduler final
67 +
class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
76 -
    : public native_scheduler
 
77 -
    , public capy::execution_context::service
 
78  
{
68  
{
79 -
    using key_type = scheduler;
 
80 -

 
81  
public:
69  
public:
82  
    /** Construct the scheduler.
70  
    /** Construct the scheduler.
83  

71  

84  
        Creates a self-pipe for reactor interruption.
72  
        Creates a self-pipe for reactor interruption.
85  

73  

86  
        @param ctx Reference to the owning execution_context.
74  
        @param ctx Reference to the owning execution_context.
87  
        @param concurrency_hint Hint for expected thread count (unused).
75  
        @param concurrency_hint Hint for expected thread count (unused).
88  
    */
76  
    */
89  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77  
    select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
90  

78  

 
79 +
    /// Destroy the scheduler.
91  
    ~select_scheduler() override;
80  
    ~select_scheduler() override;
92  

81  

93  
    select_scheduler(select_scheduler const&)            = delete;
82  
    select_scheduler(select_scheduler const&)            = delete;
94  
    select_scheduler& operator=(select_scheduler const&) = delete;
83  
    select_scheduler& operator=(select_scheduler const&) = delete;
95  

84  

 
85 +
    /// Shut down the scheduler, draining pending operations.
96 -
    void post(std::coroutine_handle<> h) const override;
 
97 -
    void post(scheduler_op* h) const override;
 
98 -
    bool running_in_this_thread() const noexcept override;
 
99 -
    void stop() override;
 
100 -
    bool stopped() const noexcept override;
 
101 -
    void restart() override;
 
102 -
    std::size_t run() override;
 
103 -
    std::size_t run_one() override;
 
104 -
    std::size_t wait_one(long usec) override;
 
105 -
    std::size_t poll() override;
 
106 -
    std::size_t poll_one() override;
 
107  
    void shutdown() override;
86  
    void shutdown() override;
108  

87  

109  
    /** Return the maximum file descriptor value supported.
88  
    /** Return the maximum file descriptor value supported.
110  

89  

111  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
90  
        Returns FD_SETSIZE - 1, the maximum fd value that can be
112  
        monitored by select(). Operations with fd >= FD_SETSIZE
91  
        monitored by select(). Operations with fd >= FD_SETSIZE
113  
        will fail with EINVAL.
92  
        will fail with EINVAL.
114  

93  

115  
        @return The maximum supported file descriptor value.
94  
        @return The maximum supported file descriptor value.
116  
    */
95  
    */
117  
    static constexpr int max_fd() noexcept
96  
    static constexpr int max_fd() noexcept
118  
    {
97  
    {
119  
        return FD_SETSIZE - 1;
98  
        return FD_SETSIZE - 1;
120  
    }
99  
    }
121  

100  

122 -
    /** Register a file descriptor for monitoring.
101 +
    /** Register a descriptor for persistent monitoring.
 
102 +

 
103 +
        The fd is added to the registered_descs_ map and will be
 
104 +
        included in subsequent select() calls. The reactor is
 
105 +
        interrupted so a blocked select() rebuilds its fd_sets.
123  

106  

124  
        @param fd The file descriptor to register.
107  
        @param fd The file descriptor to register.
125 -
        @param op The operation associated with this fd.
108 +
        @param desc Pointer to descriptor state for this fd.
126 -
        @param events Event mask: 1 = read, 2 = write, 3 = both.
 
127  
    */
109  
    */
128 -
    void register_fd(int fd, select_op* op, int events) const;
110 +
    void register_descriptor(int fd, select_descriptor_state* desc) const;
129  

111  

130 -
    /** Unregister a file descriptor from monitoring.
112 +
    /** Deregister a persistently registered descriptor.
131  

113  

132 -
        @param fd The file descriptor to unregister.
114 +
        @param fd The file descriptor to deregister.
133 -
        @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
 
134  
    */
115  
    */
135 -
    void deregister_fd(int fd, int events) const;
116 +
    void deregister_descriptor(int fd) const;
136  

117  

137 -
    void work_started() noexcept override;
118 +
    /** Interrupt the reactor so it rebuilds its fd_sets.
138 -
    void work_finished() noexcept override;
 
139  

119  

140 -
    // Event flags for register_fd/deregister_fd
120 +
        Called when a write or connect op is registered after
141 -
    static constexpr int event_read  = 1;
121 +
        the reactor's snapshot was taken. Without this, select()
142 -
    static constexpr int event_write = 2;
122 +
        may block not watching for writability on the fd.
 
123 +
    */
 
124 +
    void notify_reactor() const;
143  

125  

144  
private:
126  
private:
145 -
    std::size_t do_one(long timeout_us);
127 +
    void
146 -
    void run_reactor(std::unique_lock<std::mutex>& lock);
128 +
    run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
147 -
    void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
129 +
    void interrupt_reactor() const override;
148 -
    void interrupt_reactor() const;
 
149  
    long calculate_timeout(long requested_timeout_us) const;
130  
    long calculate_timeout(long requested_timeout_us) const;
150  

131  

151  
    // Self-pipe for interrupting select()
132  
    // Self-pipe for interrupting select()
152  
    int pipe_fds_[2]; // [0]=read, [1]=write
133  
    int pipe_fds_[2]; // [0]=read, [1]=write
153  

134  

154 -
    mutable std::mutex mutex_;
135 +
    // Per-fd tracking for fd_set building
155 -
    mutable std::condition_variable wakeup_event_;
136 +
    mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
156 -
    mutable op_queue completed_ops_;
 
157 -
    mutable std::atomic<long> outstanding_work_;
 
158 -
    std::atomic<bool> stopped_;
 
159 -

 
160 -
    // Per-fd state for tracking registered operations
 
161 -
    struct fd_state
 
162 -
    {
 
163 -
        select_op* read_op  = nullptr;
 
164 -
        select_op* write_op = nullptr;
 
165 -
    };
 
166 -
    mutable std::unordered_map<int, fd_state> registered_fds_;
 
167 -

 
168 -
    // Single reactor thread coordination
 
169 -
    mutable bool reactor_running_     = false;
 
170 -
    mutable bool reactor_interrupted_ = false;
 
171 -
    mutable int idle_thread_count_    = 0;
 
172 -

 
173 -
    // Sentinel operation for interleaving reactor runs with handler execution.
 
174 -
    // Ensures the reactor runs periodically even when handlers are continuously
 
175 -
    // posted, preventing timer starvation.
 
176 -
    struct task_op final : scheduler_op
 
177 -
    {
 
178 -
        void operator()() override {}
 
179 -
        void destroy() override {}
 
180 -
    };
 
181 -
    task_op task_op_;
 
182  
    mutable int max_fd_ = -1;
137  
    mutable int max_fd_ = -1;
183  
};
138  
};
184 -
/*
 
185 -
    select Scheduler - Single Reactor Model
 
186 -
    =======================================
 
187 -

 
188 -
    This scheduler mirrors the epoll_scheduler design but uses select() instead
 
189 -
    of epoll for I/O multiplexing. The thread coordination strategy is identical:
 
190 -
    one thread becomes the "reactor" while others wait on a condition variable.
 
191 -

 
192 -
    Thread Model
 
193 -
    ------------
 
194 -
    - ONE thread runs select() at a time (the reactor thread)
 
195 -
    - OTHER threads wait on wakeup_event_ (condition variable) for handlers
 
196 -
    - When work is posted, exactly one waiting thread wakes via notify_one()
 
197 -

 
198 -
    Key Differences from epoll
 
199 -
    --------------------------
 
200 -
    - Uses self-pipe instead of eventfd for interruption (more portable)
 
201 -
    - fd_set rebuilding each iteration (O(n) vs O(1) for epoll)
 
202 -
    - FD_SETSIZE limit (~1024 fds on most systems)
 
203 -
    - Level-triggered only (no edge-triggered mode)
 
204 -

 
205 -
    Self-Pipe Pattern
 
206 -
    -----------------
 
207 -
    To interrupt a blocking select() call (e.g., when work is posted or a timer
 
208 -
    expires), we write a byte to pipe_fds_[1]. The read end pipe_fds_[0] is
 
209 -
    always in the read_fds set, so select() returns immediately. We drain the
 
210 -
    pipe to clear the readable state.
 
211 -

 
212 -
    fd-to-op Mapping
 
213 -
    ----------------
 
214 -
    We use an unordered_map<int, fd_state> to track which operations are
 
215 -
    registered for each fd. This allows O(1) lookup when select() returns
 
216 -
    ready fds. Each fd can have at most one read op and one write op registered.
 
217 -
*/
 
218 -

 
219 -
namespace select {
 
220 -

 
221 -
struct BOOST_COROSIO_SYMBOL_VISIBLE scheduler_context
 
222 -
{
 
223 -
    select_scheduler const* key;
 
224 -
    scheduler_context* next;
 
225 -
};
 
226 -

 
227 -
inline thread_local_ptr<scheduler_context> context_stack;
 
228 -

 
229 -
struct thread_context_guard
 
230 -
{
 
231 -
    scheduler_context frame_;
 
232 -

 
233 -
    explicit thread_context_guard(select_scheduler const* ctx) noexcept
 
234 -
        : frame_{ctx, context_stack.get()}
 
235 -
    {
 
236 -
        context_stack.set(&frame_);
 
237 -
    }
 
238 -

 
239 -
    ~thread_context_guard() noexcept
 
240 -
    {
 
241 -
        context_stack.set(frame_.next);
 
242 -
    }
 
243 -
};
 
244 -

 
245 -
struct work_guard
 
246 -
{
 
247 -
    select_scheduler* self;
 
248 -
    ~work_guard()
 
249 -
    {
 
250 -
        self->work_finished();
 
251 -
    }
 
252 -
};
 
253 -

 
254 -
} // namespace select
 
255 -

 
256  

139  

257  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
140  
inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
258 -
    , outstanding_work_(0)
 
259 -
    , stopped_(false)
 
260  
    : pipe_fds_{-1, -1}
141  
    : pipe_fds_{-1, -1}
261 -
    , reactor_running_(false)
 
262 -
    , reactor_interrupted_(false)
 
263 -
    , idle_thread_count_(0)
 
264  
    , max_fd_(-1)
142  
    , max_fd_(-1)
265 -
    // Create self-pipe for interrupting select()
 
266  
{
143  
{
267  
    if (::pipe(pipe_fds_) < 0)
144  
    if (::pipe(pipe_fds_) < 0)
268  
        detail::throw_system_error(make_err(errno), "pipe");
145  
        detail::throw_system_error(make_err(errno), "pipe");
269 -
    // Set both ends to non-blocking and close-on-exec
 
270  

146  

271  
    for (int i = 0; i < 2; ++i)
147  
    for (int i = 0; i < 2; ++i)
272  
    {
148  
    {
273  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
149  
        int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
274  
        if (flags == -1)
150  
        if (flags == -1)
275  
        {
151  
        {
276  
            int errn = errno;
152  
            int errn = errno;
277  
            ::close(pipe_fds_[0]);
153  
            ::close(pipe_fds_[0]);
278  
            ::close(pipe_fds_[1]);
154  
            ::close(pipe_fds_[1]);
279  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
155  
            detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
280  
        }
156  
        }
281  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
157  
        if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
282  
        {
158  
        {
283  
            int errn = errno;
159  
            int errn = errno;
284  
            ::close(pipe_fds_[0]);
160  
            ::close(pipe_fds_[0]);
285  
            ::close(pipe_fds_[1]);
161  
            ::close(pipe_fds_[1]);
286  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
162  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
287  
        }
163  
        }
288  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
164  
        if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
289  
        {
165  
        {
290  
            int errn = errno;
166  
            int errn = errno;
291  
            ::close(pipe_fds_[0]);
167  
            ::close(pipe_fds_[0]);
292  
            ::close(pipe_fds_[1]);
168  
            ::close(pipe_fds_[1]);
293  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
169  
            detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
294  
        }
170  
        }
295  
    }
171  
    }
296  

172  

297  
    timer_svc_ = &get_timer_service(ctx, *this);
173  
    timer_svc_ = &get_timer_service(ctx, *this);
298  
    timer_svc_->set_on_earliest_changed(
174  
    timer_svc_->set_on_earliest_changed(
299  
        timer_service::callback(this, [](void* p) {
175  
        timer_service::callback(this, [](void* p) {
300  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
176  
            static_cast<select_scheduler*>(p)->interrupt_reactor();
301  
        }));
177  
        }));
302 -
    // Initialize resolver service
 
303  

178  

304 -

 
305 -
    // Initialize signal service
 
306  
    get_resolver_service(ctx, *this);
179  
    get_resolver_service(ctx, *this);
307  
    get_signal_service(ctx, *this);
180  
    get_signal_service(ctx, *this);
308 -
    // Push task sentinel to interleave reactor runs with handler execution
 
309  

181  

310  
    completed_ops_.push(&task_op_);
182  
    completed_ops_.push(&task_op_);
311  
}
183  
}
312  

184  

313  
inline select_scheduler::~select_scheduler()
185  
inline select_scheduler::~select_scheduler()
314  
{
186  
{
315  
    if (pipe_fds_[0] >= 0)
187  
    if (pipe_fds_[0] >= 0)
316  
        ::close(pipe_fds_[0]);
188  
        ::close(pipe_fds_[0]);
317  
    if (pipe_fds_[1] >= 0)
189  
    if (pipe_fds_[1] >= 0)
318  
        ::close(pipe_fds_[1]);
190  
        ::close(pipe_fds_[1]);
319  
}
191  
}
320  

192  

321  
inline void
193  
inline void
322  
select_scheduler::shutdown()
194  
select_scheduler::shutdown()
323  
{
195  
{
324 -
    {
196 +
    shutdown_drain();
325 -
        std::unique_lock lock(mutex_);
 
326 -

 
327 -
        while (auto* h = completed_ops_.pop())
 
328 -
        {
 
329 -
            if (h == &task_op_)
 
330 -
                continue;
 
331 -
            lock.unlock();
 
332 -
            h->destroy();
 
333 -
            lock.lock();
 
334 -
        }
 
335 -
    }
 
336  

197  

337  
    if (pipe_fds_[1] >= 0)
198  
    if (pipe_fds_[1] >= 0)
338 -

 
339 -
    wakeup_event_.notify_all();
 
340  
        interrupt_reactor();
199  
        interrupt_reactor();
341  
}
200  
}
342  

201  

343  
inline void
202  
inline void
344 -
select_scheduler::post(std::coroutine_handle<> h) const
203 +
select_scheduler::register_descriptor(
345 -
{
204 +
    int fd, select_descriptor_state* desc) const
346 -
    struct post_handler final : scheduler_op
 
347 -
    {
 
348 -
        std::coroutine_handle<> h_;
 
349 -

 
350 -
        explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
 
351 -

 
352 -
        ~post_handler() override = default;
 
353 -

 
354 -
        void operator()() override
 
355 -
        {
 
356 -
            auto h = h_;
 
357 -
            delete this;
 
358 -
            h.resume();
 
359 -
        }
 
360 -

 
361 -
        void destroy() override
 
362 -
        {
 
363 -
            auto h = h_;
 
364 -
            delete this;
 
365 -
            h.destroy();
 
366 -
        }
 
367 -
    };
 
368 -

 
369 -
    auto ph = std::make_unique<post_handler>(h);
 
370 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
371 -

 
372 -
    std::unique_lock lock(mutex_);
 
373 -
    completed_ops_.push(ph.release());
 
374 -
    wake_one_thread_and_unlock(lock);
 
375 -
}
 
376 -

 
377 -
inline void
 
378 -
select_scheduler::post(scheduler_op* h) const
 
379 -
{
 
380 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
381 -

 
382 -
    std::unique_lock lock(mutex_);
 
383 -
    completed_ops_.push(h);
 
384 -
    wake_one_thread_and_unlock(lock);
 
385 -
}
 
386 -

 
387 -
inline bool
 
388 -
select_scheduler::running_in_this_thread() const noexcept
 
389 -
{
 
390 -
    for (auto* c = select::context_stack.get(); c != nullptr; c = c->next)
 
391 -
        if (c->key == this)
 
392 -
            return true;
 
393 -
    return false;
 
394 -
}
 
395 -

 
396 -
inline void
 
397 -
select_scheduler::stop()
 
398 -
{
 
399 -
    bool expected = false;
 
400 -
    if (stopped_.compare_exchange_strong(
 
401 -
            expected, true, std::memory_order_release,
 
402 -
            std::memory_order_relaxed))
 
403 -
    {
 
404 -
        // Wake all threads so they notice stopped_ and exit
 
405 -
        {
 
406 -
            std::lock_guard lock(mutex_);
 
407 -
            wakeup_event_.notify_all();
 
408 -
        }
 
409 -
        interrupt_reactor();
 
410 -
    }
 
411 -
}
 
412 -

 
413 -
inline bool
 
414 -
select_scheduler::stopped() const noexcept
 
415 -
{
 
416 -
    return stopped_.load(std::memory_order_acquire);
 
417 -
}
 
418 -

 
419 -
inline void
 
420 -
select_scheduler::restart()
 
421 -
{
 
422 -
    stopped_.store(false, std::memory_order_release);
 
423 -
}
 
424 -

 
425 -
inline std::size_t
 
426 -
select_scheduler::run()
 
427 -
{
 
428 -
    if (stopped_.load(std::memory_order_acquire))
 
429 -
        return 0;
 
430 -

 
431 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
432 -
    {
 
433 -
        stop();
 
434 -
        return 0;
 
435 -
    }
 
436 -

 
437 -
    select::thread_context_guard ctx(this);
 
438 -

 
439 -
    std::size_t n = 0;
 
440 -
    while (do_one(-1))
 
441 -
        if (n != (std::numeric_limits<std::size_t>::max)())
 
442 -
            ++n;
 
443 -
    return n;
 
444 -
}
 
445 -

 
446 -
inline std::size_t
 
447 -
select_scheduler::run_one()
 
448 -
{
 
449 -
    if (stopped_.load(std::memory_order_acquire))
 
450 -
        return 0;
 
451 -

 
452 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
453 -
    {
 
454 -
        stop();
 
455 -
        return 0;
 
456 -
    }
 
457 -

 
458 -
    select::thread_context_guard ctx(this);
 
459 -
    return do_one(-1);
 
460 -
}
 
461 -

 
462 -
inline std::size_t
 
463 -
select_scheduler::wait_one(long usec)
 
464 -
{
 
465 -
    if (stopped_.load(std::memory_order_acquire))
 
466 -
        return 0;
 
467 -

 
468 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
469 -
    {
 
470 -
        stop();
 
471 -
        return 0;
 
472 -
    }
 
473 -

 
474 -
    select::thread_context_guard ctx(this);
 
475 -
    return do_one(usec);
 
476 -
}
 
477 -

 
478 -
inline std::size_t
 
479 -
select_scheduler::poll()
 
480  
{
205  
{
481 -
    if (stopped_.load(std::memory_order_acquire))
206 +
    if (fd < 0 || fd >= FD_SETSIZE)
482 -
        return 0;
207 +
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
483 -

 
484 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
485 -
    {
 
486 -
        stop();
 
487 -
        return 0;
 
488 -
    }
 
489 -

 
490 -
    select::thread_context_guard ctx(this);
 
491 -

 
492 -
    std::size_t n = 0;
 
493 -
    while (do_one(0))
 
494 -
        if (n != (std::numeric_limits<std::size_t>::max)())
 
495 -
            ++n;
 
496 -
    return n;
 
497 -
}
 
498  

208  

499 -
inline std::size_t
209 +
    desc->registered_events = reactor_event_read | reactor_event_write;
500 -
select_scheduler::poll_one()
210 +
    desc->fd                = fd;
501 -
{
211 +
    desc->scheduler_        = this;
502 -
    if (stopped_.load(std::memory_order_acquire))
212 +
    desc->ready_events_.store(0, std::memory_order_relaxed);
503 -
        return 0;
 
504 -
    if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
505  

213  

506  
    {
214  
    {
507 -
        stop();
215 +
        std::lock_guard lock(desc->mutex);
508 -
        return 0;
216 +
        desc->impl_ref_.reset();
 
217 +
        desc->read_ready  = false;
 
218 +
        desc->write_ready = false;
509  
    }
219  
    }
510 -
    select::thread_context_guard ctx(this);
 
511 -
    return do_one(0);
 
512 -
}
 
513 -

 
514 -
inline void
 
515 -
select_scheduler::register_fd(int fd, select_op* op, int events) const
 
516 -
{
 
517 -
    // Validate fd is within select() limits
 
518 -
    if (fd < 0 || fd >= FD_SETSIZE)
 
519 -
        detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
 
520 -

 
521  

220  

522  
    {
221  
    {
523  
        std::lock_guard lock(mutex_);
222  
        std::lock_guard lock(mutex_);
524 -

223 +
        registered_descs_[fd] = desc;
525 -
        auto& state = registered_fds_[fd];
 
526 -
        if (events & event_read)
 
527 -
            state.read_op = op;
 
528 -
        if (events & event_write)
 
529 -
            state.write_op = op;
 
530 -

 
531  
        if (fd > max_fd_)
224  
        if (fd > max_fd_)
532  
            max_fd_ = fd;
225  
            max_fd_ = fd;
533  
    }
226  
    }
534 -
    // Wake the reactor so a thread blocked in select() rebuilds its fd_sets
 
535 -
    // with the newly registered fd.
 
536  

227  

537  
    interrupt_reactor();
228  
    interrupt_reactor();
538  
}
229  
}
539  

230  

540  
inline void
231  
inline void
541 -
select_scheduler::deregister_fd(int fd, int events) const
232 +
select_scheduler::deregister_descriptor(int fd) const
542  
{
233  
{
543  
    std::lock_guard lock(mutex_);
234  
    std::lock_guard lock(mutex_);
544  

235  

545 -
    auto it = registered_fds_.find(fd);
236 +
    auto it = registered_descs_.find(fd);
546 -
    if (it == registered_fds_.end())
237 +
    if (it == registered_descs_.end())
547  
        return;
238  
        return;
548  

239  

549 -
    if (events & event_read)
240 +
    registered_descs_.erase(it);
550 -
        it->second.read_op = nullptr;
 
551 -
    if (events & event_write)
 
552 -
        it->second.write_op = nullptr;
 
553  

241  

554 -
    // Remove entry if both are null
242 +
    if (fd == max_fd_)
555 -
    if (!it->second.read_op && !it->second.write_op)
 
556  
    {
243  
    {
557 -
        registered_fds_.erase(it);
244 +
        max_fd_ = pipe_fds_[0];
558 -

245 +
        for (auto& [registered_fd, state] : registered_descs_)
559 -
        // Recalculate max_fd_ if needed
 
560 -
        if (fd == max_fd_)
 
561  
        {
246  
        {
562 -
            max_fd_ = pipe_fds_[0]; // At minimum, the pipe read end
247 +
            if (registered_fd > max_fd_)
563 -
            for (auto& [registered_fd, state] : registered_fds_)
248 +
                max_fd_ = registered_fd;
564 -
            {
 
565 -
                if (registered_fd > max_fd_)
 
566 -
                    max_fd_ = registered_fd;
 
567 -
            }
 
568  
        }
249  
        }
569  
    }
250  
    }
570  
}
251  
}
571  

252  

572  
inline void
253  
inline void
573 -
select_scheduler::work_started() noexcept
254 +
select_scheduler::notify_reactor() const
574 -
{
 
575 -
    outstanding_work_.fetch_add(1, std::memory_order_relaxed);
 
576 -
}
 
577 -

 
578 -
inline void
 
579 -
select_scheduler::work_finished() noexcept
 
580  
{
255  
{
581 -
    if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
256 +
    interrupt_reactor();
582 -
        stop();
 
583  
}
257  
}
584  

258  

585  
inline void
259  
inline void
586  
select_scheduler::interrupt_reactor() const
260  
select_scheduler::interrupt_reactor() const
587  
{
261  
{
588  
    char byte               = 1;
262  
    char byte               = 1;
589  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
263  
    [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
590  
}
264  
}
591 -
inline void
 
592 -
select_scheduler::wake_one_thread_and_unlock(
 
593 -
    std::unique_lock<std::mutex>& lock) const
 
594 -
{
 
595 -
    if (idle_thread_count_ > 0)
 
596 -
    {
 
597 -
        // Idle worker exists - wake it via condvar
 
598 -
        wakeup_event_.notify_one();
 
599 -
        lock.unlock();
 
600 -
    }
 
601 -
    else if (reactor_running_ && !reactor_interrupted_)
 
602 -
    {
 
603 -
        // No idle workers but reactor is running - interrupt it
 
604 -
        reactor_interrupted_ = true;
 
605 -
        lock.unlock();
 
606 -
        interrupt_reactor();
 
607 -
    }
 
608 -
    else
 
609 -
    {
 
610 -
        // No one to wake
 
611 -
        lock.unlock();
 
612 -
    }
 
613 -
}
 
614 -

 
615  

265  

616  
inline long
266  
inline long
617  
select_scheduler::calculate_timeout(long requested_timeout_us) const
267  
select_scheduler::calculate_timeout(long requested_timeout_us) const
618  
{
268  
{
619  
    if (requested_timeout_us == 0)
269  
    if (requested_timeout_us == 0)
620  
        return 0;
270  
        return 0;
621  

271  

622  
    auto nearest = timer_svc_->nearest_expiry();
272  
    auto nearest = timer_svc_->nearest_expiry();
623  
    if (nearest == timer_service::time_point::max())
273  
    if (nearest == timer_service::time_point::max())
624  
        return requested_timeout_us;
274  
        return requested_timeout_us;
625  

275  

626  
    auto now = std::chrono::steady_clock::now();
276  
    auto now = std::chrono::steady_clock::now();
627  
    if (nearest <= now)
277  
    if (nearest <= now)
628  
        return 0;
278  
        return 0;
629  

279  

630  
    auto timer_timeout_us =
280  
    auto timer_timeout_us =
631  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
281  
        std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
632  
            .count();
282  
            .count();
633 -
    // Clamp to [0, LONG_MAX] to prevent truncation on 32-bit long platforms
 
634  

283  

635  
    constexpr auto long_max =
284  
    constexpr auto long_max =
636  
        static_cast<long long>((std::numeric_limits<long>::max)());
285  
        static_cast<long long>((std::numeric_limits<long>::max)());
637  
    auto capped_timer_us =
286  
    auto capped_timer_us =
638  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
287  
        (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
639  
                              static_cast<long long>(0)),
288  
                              static_cast<long long>(0)),
640  
                   long_max);
289  
                   long_max);
641  

290  

642  
    if (requested_timeout_us < 0)
291  
    if (requested_timeout_us < 0)
643  
        return static_cast<long>(capped_timer_us);
292  
        return static_cast<long>(capped_timer_us);
644 -
    // requested_timeout_us is already long, so min() result fits in long
 
645  

293  

646  
    return static_cast<long>(
294  
    return static_cast<long>(
647  
        (std::min)(static_cast<long long>(requested_timeout_us),
295  
        (std::min)(static_cast<long long>(requested_timeout_us),
648  
                   capped_timer_us));
296  
                   capped_timer_us));
649  
}
297  
}
650  

298  

651  
inline void
299  
inline void
652 -
select_scheduler::run_reactor(std::unique_lock<std::mutex>& lock)
300 +
select_scheduler::run_task(
 
301 +
    std::unique_lock<std::mutex>& lock, context_type* ctx)
653  
{
302  
{
654 -
    // Calculate timeout considering timers, use 0 if interrupted
303 +
    long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
655 -
    long effective_timeout_us =
304 +

656 -
        reactor_interrupted_ ? 0 : calculate_timeout(-1);
305 +
    // Snapshot registered descriptors while holding lock.
 
306 +
    // Record which fds need write monitoring to avoid a hot loop:
 
307 +
    // select is level-triggered so writable sockets (nearly always
 
308 +
    // writable) would cause select() to return immediately every
 
309 +
    // iteration if unconditionally added to write_fds.
 
310 +
    struct fd_entry
 
311 +
    {
 
312 +
        int fd;
 
313 +
        select_descriptor_state* desc;
 
314 +
        bool needs_write;
 
315 +
    };
 
316 +
    fd_entry snapshot[FD_SETSIZE];
 
317 +
    int snapshot_count = 0;
 
318 +

 
319 +
    for (auto& [fd, desc] : registered_descs_)
 
320 +
    {
 
321 +
        if (snapshot_count < FD_SETSIZE)
 
322 +
        {
 
323 +
            std::lock_guard desc_lock(desc->mutex);
 
324 +
            snapshot[snapshot_count].fd          = fd;
 
325 +
            snapshot[snapshot_count].desc        = desc;
 
326 +
            snapshot[snapshot_count].needs_write =
 
327 +
                (desc->write_op || desc->connect_op);
 
328 +
            ++snapshot_count;
 
329 +
        }
 
330 +
    }
 
331 +

 
332 +
    if (lock.owns_lock())
 
333 +
        lock.unlock();
 
334 +

 
335 +
    task_cleanup on_exit{this, &lock, ctx};
657 -
    // Build fd_sets from registered_fds_
 
658  

336  

659  
    fd_set read_fds, write_fds, except_fds;
337  
    fd_set read_fds, write_fds, except_fds;
660  
    FD_ZERO(&read_fds);
338  
    FD_ZERO(&read_fds);
661  
    FD_ZERO(&write_fds);
339  
    FD_ZERO(&write_fds);
662  
    FD_ZERO(&except_fds);
340  
    FD_ZERO(&except_fds);
663 -
    // Always include the interrupt pipe
 
664  

341  

665  
    FD_SET(pipe_fds_[0], &read_fds);
342  
    FD_SET(pipe_fds_[0], &read_fds);
666  
    int nfds = pipe_fds_[0];
343  
    int nfds = pipe_fds_[0];
667  

344  

668 -
    // Add registered fds
345 +
    for (int i = 0; i < snapshot_count; ++i)
669 -
    for (auto& [fd, state] : registered_fds_)
 
670  
    {
346  
    {
671 -
        if (state.read_op)
347 +
        int fd = snapshot[i].fd;
672 -
            FD_SET(fd, &read_fds);
348 +
        FD_SET(fd, &read_fds);
673 -
        if (state.write_op)
349 +
        if (snapshot[i].needs_write)
674 -
        {
 
675  
            FD_SET(fd, &write_fds);
350  
            FD_SET(fd, &write_fds);
676 -
            // Also monitor for errors on connect operations
351 +
        FD_SET(fd, &except_fds);
677 -
            FD_SET(fd, &except_fds);
 
678 -
        }
 
679  
        if (fd > nfds)
352  
        if (fd > nfds)
680  
            nfds = fd;
353  
            nfds = fd;
681  
    }
354  
    }
682 -
    // Convert timeout to timeval
 
683  

355  

684  
    struct timeval tv;
356  
    struct timeval tv;
685  
    struct timeval* tv_ptr = nullptr;
357  
    struct timeval* tv_ptr = nullptr;
686  
    if (effective_timeout_us >= 0)
358  
    if (effective_timeout_us >= 0)
687  
    {
359  
    {
688  
        tv.tv_sec  = effective_timeout_us / 1000000;
360  
        tv.tv_sec  = effective_timeout_us / 1000000;
689  
        tv.tv_usec = effective_timeout_us % 1000000;
361  
        tv.tv_usec = effective_timeout_us % 1000000;
690  
        tv_ptr     = &tv;
362  
        tv_ptr     = &tv;
691  
    }
363  
    }
692 -
    lock.unlock();
 
693 -

 
694  

364  

695  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
365  
    int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
696 -
    int saved_errno = errno;
366 +

 
367 +
    // EINTR: signal interrupted select(), just retry.
 
368 +
    // EBADF: an fd was closed between snapshot and select(); retry
 
369 +
    // with a fresh snapshot from registered_descs_.
 
370 +
    if (ready < 0)
 
371 +
    {
 
372 +
        if (errno == EINTR || errno == EBADF)
 
373 +
            return;
 
374 +
        detail::throw_system_error(make_err(errno), "select");
 
375 +
    }
697  

376  

698  
    // Process timers outside the lock
377  
    // Process timers outside the lock
699  
    timer_svc_->process_expired();
378  
    timer_svc_->process_expired();
700  

379  

701 -
    if (ready < 0 && saved_errno != EINTR)
380 +
    op_queue local_ops;
702 -
        detail::throw_system_error(make_err(saved_errno), "select");
 
703 -

 
704 -
    // Re-acquire lock before modifying completed_ops_
 
705 -
    lock.lock();
 
706 -

 
707 -
    // Drain the interrupt pipe if readable
 
708 -
    if (ready > 0 && FD_ISSET(pipe_fds_[0], &read_fds))
 
709 -
    {
 
710 -
        char buf[256];
 
711 -
        while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
 
712 -
        {
 
713 -
        }
 
714 -
    }
 
715 -
    // Process I/O completions
 
716 -
    int completions_queued = 0;
 
717  

381  

718  
    if (ready > 0)
382  
    if (ready > 0)
719  
    {
383  
    {
720 -
        // Iterate over registered fds (copy keys to avoid iterator invalidation)
384 +
        if (FD_ISSET(pipe_fds_[0], &read_fds))
721 -
        std::vector<int> fds_to_check;
 
722 -
        fds_to_check.reserve(registered_fds_.size());
 
723 -
        for (auto& [fd, state] : registered_fds_)
 
724 -
            fds_to_check.push_back(fd);
 
725 -

 
726 -
        for (int fd : fds_to_check)
 
727  
        {
385  
        {
728 -
            auto it = registered_fds_.find(fd);
386 +
            char buf[256];
729 -
            if (it == registered_fds_.end())
387 +
            while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
730 -
                continue;
 
731 -

 
732 -
            auto& state = it->second;
 
733 -

 
734 -
            // Check for errors (especially for connect operations)
 
735 -
            bool has_error = FD_ISSET(fd, &except_fds);
 
736 -

 
737 -
            // Process read readiness
 
738 -
            if (state.read_op && (FD_ISSET(fd, &read_fds) || has_error))
 
739 -
            {
 
740 -
                auto* op = state.read_op;
 
741 -
                // Claim the op by exchanging to unregistered. Both registering and
 
742 -
                // registered states mean the op is ours to complete.
 
743 -
                auto prev = op->registered.exchange(
 
744 -
                    select_registration_state::unregistered,
 
745 -
                    std::memory_order_acq_rel);
 
746 -
                if (prev != select_registration_state::unregistered)
 
747 -
                {
 
748 -
                    state.read_op = nullptr;
 
749 -

 
750 -
                    if (has_error)
 
751 -
                    {
 
752 -
                        int errn      = 0;
 
753 -
                        socklen_t len = sizeof(errn);
 
754 -
                        if (::getsockopt(
 
755 -
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
 
756 -
                            errn = errno;
 
757 -
                        if (errn == 0)
 
758 -
                            errn = EIO;
 
759 -
                        op->complete(errn, 0);
 
760 -
                    }
 
761 -
                    else
 
762 -
                    {
 
763 -
                        op->perform_io();
 
764 -
                    }
 
765 -

 
766 -
                    completed_ops_.push(op);
 
767 -
                    ++completions_queued;
 
768 -
                }
 
769 -
            }
 
770 -

 
771 -
            // Process write readiness
 
772 -
            if (state.write_op && (FD_ISSET(fd, &write_fds) || has_error))
 
773 -
                auto* op = state.write_op;
 
774 -
                // Claim the op by exchanging to unregistered. Both registering and
 
775 -
                // registered states mean the op is ours to complete.
 
776 -
                auto prev = op->registered.exchange(
 
777 -
                    select_registration_state::unregistered,
 
778 -
                    std::memory_order_acq_rel);
 
779 -
                if (prev != select_registration_state::unregistered)
 
780 -
                {
 
781 -
                    state.write_op = nullptr;
 
782 -

 
783 -
                    if (has_error)
 
784 -
                    {
 
785 -
                        int errn      = 0;
 
786 -
                        socklen_t len = sizeof(errn);
 
787 -
                        if (::getsockopt(
 
788 -
                                fd, SOL_SOCKET, SO_ERROR, &errn, &len) < 0)
 
789 -
                            errn = errno;
 
790 -
                        if (errn == 0)
 
791 -
                            errn = EIO;
 
792 -
                        op->complete(errn, 0);
 
793 -
                    }
 
794 -
                    else
 
795 -
                    {
 
796 -
                        op->perform_io();
 
797 -
                    }
 
798 -

 
799 -
                    completed_ops_.push(op);
 
800 -
                    ++completions_queued;
 
801 -
                }
 
802  
            {
388  
            {
803 -

 
804 -
            // Clean up empty entries
 
805 -
            if (!state.read_op && !state.write_op)
 
806 -
                registered_fds_.erase(it);
 
807  
            }
389  
            }
808 -
    }
 
809 -

 
810 -
    if (completions_queued > 0)
 
811 -
    {
 
812 -
        if (completions_queued == 1)
 
813 -
            wakeup_event_.notify_one();
 
814 -
        else
 
815 -
            wakeup_event_.notify_all();
 
816 -
    }
 
817 -
}
 
818  
        }
390  
        }
819  

391  

820 -
inline std::size_t
392 +
        for (int i = 0; i < snapshot_count; ++i)
821 -
select_scheduler::do_one(long timeout_us)
393 +
        {
822 -
{
394 +
            int fd                        = snapshot[i].fd;
823 -
    std::unique_lock lock(mutex_);
395 +
            select_descriptor_state* desc = snapshot[i].desc;
824  

396  

825 -
    for (;;)
397 +
            std::uint32_t flags = 0;
826 -
    {
398 +
            if (FD_ISSET(fd, &read_fds))
827 -
        if (stopped_.load(std::memory_order_acquire))
399 +
                flags |= reactor_event_read;
828 -
            return 0;
400 +
            if (FD_ISSET(fd, &write_fds))
 
401 +
                flags |= reactor_event_write;
 
402 +
            if (FD_ISSET(fd, &except_fds))
 
403 +
                flags |= reactor_event_error;
829  

404  

830 -
        scheduler_op* op = completed_ops_.pop();
405 +
            if (flags == 0)
 
406 +
                continue;
831  

407  

832 -
        if (op == &task_op_)
408 +
            desc->add_ready_events(flags);
833 -
        {
 
834 -
            bool more_handlers = !completed_ops_.empty();
 
835  

409  

836 -
            if (!more_handlers)
410 +
            bool expected = false;
 
411 +
            if (desc->is_enqueued_.compare_exchange_strong(
 
412 +
                    expected, true, std::memory_order_release,
 
413 +
                    std::memory_order_relaxed))
837  
            {
414  
            {
838 -
                if (outstanding_work_.load(std::memory_order_acquire) == 0)
415 +
                local_ops.push(desc);
839 -
                {
 
840 -
                    completed_ops_.push(&task_op_);
 
841 -
                    return 0;
 
842 -
                }
 
843 -
                if (timeout_us == 0)
 
844 -
                {
 
845 -
                    completed_ops_.push(&task_op_);
 
846 -
                    return 0;
 
847 -
                }
 
848 -

 
849 -
            reactor_interrupted_ = more_handlers || timeout_us == 0;
 
850 -
            reactor_running_     = true;
 
851 -

 
852 -
            if (more_handlers && idle_thread_count_ > 0)
 
853 -
                wakeup_event_.notify_one();
 
854 -

 
855 -
            run_reactor(lock);
 
856 -

 
857 -
            reactor_running_ = false;
 
858 -
            completed_ops_.push(&task_op_);
 
859 -
            continue;
 
860  
            }
416  
            }
861  
        }
417  
        }
 
418 +
    }
862  

419  

863 -
        if (op != nullptr)
420 +
    lock.lock();
864 -
        {
 
865 -
            lock.unlock();
 
866 -
            select::work_guard g{this};
 
867 -
            (*op)();
 
868 -
            return 1;
 
869 -
        }
 
870 -

 
871 -
        if (outstanding_work_.load(std::memory_order_acquire) == 0)
 
872 -
            return 0;
 
873 -

 
874 -
        if (timeout_us == 0)
 
875 -
            return 0;
 
876  

421  

877 -
        ++idle_thread_count_;
422 +
    if (!local_ops.empty())
878 -
        if (timeout_us < 0)
423 +
        completed_ops_.splice(local_ops);
879 -
            wakeup_event_.wait(lock);
 
880 -
        else
 
881 -
            wakeup_event_.wait_for(lock, std::chrono::microseconds(timeout_us));
 
882 -
        --idle_thread_count_;
 
883 -
    }
 
884  
}
424  
}
885  

425  

886  
} // namespace boost::corosio::detail
426  
} // namespace boost::corosio::detail
887  

427  

888  
#endif // BOOST_COROSIO_HAS_SELECT
428  
#endif // BOOST_COROSIO_HAS_SELECT
889  

429  

890  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
430  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP