1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17 -
#include <boost/corosio/detail/config.hpp>
17 +
#include <boost/corosio/native/detail/reactor/reactor_op.hpp>
18 -
#include <boost/corosio/io/io_object.hpp>
18 +
#include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
19 -
#include <boost/corosio/endpoint.hpp>
 
20 -
#include <boost/capy/ex/executor_ref.hpp>
 
21 -
#include <coroutine>
 
22 -
#include <boost/capy/error.hpp>
 
23 -
#include <system_error>
 
24 -

 
25 -
#include <boost/corosio/native/detail/make_err.hpp>
 
26 -
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 -
#include <boost/corosio/detail/scheduler_op.hpp>
 
28 -
#include <boost/corosio/native/detail/endpoint_convert.hpp>
 
29 -
#include <unistd.h>
 
30  

19  

31  
#include <errno.h>
20  
#include <errno.h>
32 -

 
33 -
#include <atomic>
 
34 -
#include <cstddef>
 
35 -
#include <memory>
 
36 -
#include <optional>
 
37 -
#include <stop_token>
 
38 -

 
39 -
#include <netinet/in.h>
 
40 -
#include <sys/select.h>
 
41  
#include <fcntl.h>
21  
#include <fcntl.h>
42  
#include <sys/socket.h>
22  
#include <sys/socket.h>
43 -
#include <sys/uio.h>
23 +
#include <unistd.h>
44  

24  

45  
/*
25  
/*
46 -
    select Operation State
26 +
    File descriptors are registered with the select scheduler once (via
47 -
    ======================
27 +
    select_descriptor_state) and stay registered until closed.
48 -

 
49 -
    Each async I/O operation has a corresponding select_op-derived struct that
 
50 -
    holds the operation's state while it's in flight. The socket impl owns
 
51 -
    fixed slots for each operation type (conn_, rd_, wr_), so only one
 
52 -
    operation of each type can be pending per socket at a time.
 
53 -

 
54 -
    This mirrors the epoll_op design for consistency across backends.
 
55 -

 
56 -
    Completion vs Cancellation Race
 
57 -
    -------------------------------
 
58 -
    The `registered` atomic uses a tri-state (unregistered, registering,
 
59 -
    registered) to handle two races: (1) between register_fd() and the
 
60 -
    reactor seeing an event, and (2) between reactor completion and cancel().
 
61 -

 
62 -
    The registering state closes the window where an event could arrive
 
63 -
    after register_fd() but before the boolean was set. The reactor and
 
64 -
    cancel() both treat registering the same as registered when claiming.
 
65 -

 
66 -
    Whoever atomically exchanges to unregistered "claims" the operation
 
67 -
    and is responsible for completing it. The loser sees unregistered and
 
68 -
    does nothing. The initiating thread uses compare_exchange to transition
 
69 -
    from registering to registered; if this fails, the reactor or cancel
 
70 -
    already claimed the op.
 
71  

28  

72 -
    Impl Lifetime Management
29 +
    select() is level-triggered but the descriptor_state pattern
73 -
    ------------------------
30 +
    (designed for edge-triggered) works correctly: is_enqueued_ CAS
74 -
    When cancel() posts an op to the scheduler's ready queue, the socket impl
31 +
    prevents double-enqueue, add_ready_events is idempotent, and
75 -
    might be destroyed before the scheduler processes the op. The `impl_ptr`
32 +
    EAGAIN ops stay parked until the next select() re-reports readiness.
76 -
    member holds a shared_ptr to the impl, keeping it alive until the op
 
77 -
    completes.
 
78  

33  

79 -
    EOF Detection
34 +
    cancel() captures shared_from_this() into op.impl_ptr to prevent
80 -
    -------------
35 +
    use-after-free when the socket is closed with pending ops.
81 -
    For reads, 0 bytes with no error means EOF. But an empty user buffer also
 
82 -
    returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
 
83  

36  

84 -
    SIGPIPE Prevention
37 +
    Writes use sendmsg(MSG_NOSIGNAL) on Linux. On macOS/BSD where
85 -
    ------------------
38 +
    MSG_NOSIGNAL may be absent, SO_NOSIGPIPE is set at socket creation
86 -
    Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
39 +
    and accepted-socket setup instead.
87 -
    SIGPIPE when the peer has closed.
 
88  
*/
40  
*/
89  

41  

90  
namespace boost::corosio::detail {
42  
namespace boost::corosio::detail {
91  

43  

92 -
// Forward declarations for cancellation support
44 +
// Forward declarations
93  
class select_socket;
45  
class select_socket;
94  
class select_acceptor;
46  
class select_acceptor;
 
47 +
struct select_op;
95  

48  

96 -
/** Registration state for async operations.
49 +
// Forward declaration
 
50 +
class select_scheduler;
97  

51  

98 -
    Tri-state enum to handle the race between register_fd() and
52 +
/// Per-descriptor state for persistent select registration.
99 -
    run_reactor() seeing an event. Setting REGISTERING before
53 +
struct select_descriptor_state final : reactor_descriptor_state
100 -
    calling register_fd() ensures events delivered during the
54 +
{};
101 -
    registration window are not dropped.
 
102 -
*/
 
103 -
enum class select_registration_state : std::uint8_t
 
104 -
{
 
105 -
    unregistered, ///< Not registered with reactor
 
106 -
    registering,  ///< register_fd() called, not yet confirmed
 
107 -
    registered    ///< Fully registered, ready for events
 
108 -
};
 
109  

55  

110 -
struct select_op : scheduler_op
56 +
/// select base operation — thin wrapper over reactor_op.
 
57 +
struct select_op : reactor_op<select_socket, select_acceptor>
111  
{
58  
{
112 -
    struct canceller
59 +
    void operator()() override;
113 -
    {
 
114 -
        select_op* op;
 
115 -
        void operator()() const noexcept;
 
116 -
    };
 
117 -

 
118 -
    std::coroutine_handle<> h;
 
119 -
    capy::executor_ref ex;
 
120 -
    std::error_code* ec_out = nullptr;
 
121 -
    std::size_t* bytes_out  = nullptr;
 
122 -

 
123 -
    int fd                        = -1;
 
124 -
    int errn                      = 0;
 
125 -
    std::size_t bytes_transferred = 0;
 
126 -

 
127 -
    std::atomic<bool> cancelled{false};
 
128 -
    std::atomic<select_registration_state> registered{
 
129 -
        select_registration_state::unregistered};
 
130 -
    std::optional<std::stop_callback<canceller>> stop_cb;
 
131 -

 
132 -
    // Prevents use-after-free when socket is closed with pending ops.
 
133 -
    std::shared_ptr<void> impl_ptr;
 
134 -

 
135 -
    // For stop_token cancellation - pointer to owning socket/acceptor impl.
 
136 -
    select_socket* socket_impl_     = nullptr;
 
137 -
    select_acceptor* acceptor_impl_ = nullptr;
 
138 -

 
139 -
    select_op() = default;
 
140 -

 
141 -
    void reset() noexcept
 
142 -
    {
 
143 -
        fd                = -1;
 
144 -
        errn              = 0;
 
145 -
        bytes_transferred = 0;
 
146 -
        cancelled.store(false, std::memory_order_relaxed);
 
147 -
        registered.store(
 
148 -
            select_registration_state::unregistered, std::memory_order_relaxed);
 
149 -
        impl_ptr.reset();
 
150 -
        socket_impl_   = nullptr;
 
151 -
        acceptor_impl_ = nullptr;
 
152 -
    }
 
153 -

 
154 -
    void operator()() override
 
155 -
    {
 
156 -
        stop_cb.reset();
 
157 -

 
158 -
        if (ec_out)
 
159 -
        {
 
160 -
            if (cancelled.load(std::memory_order_acquire))
 
161 -
                *ec_out = capy::error::canceled;
 
162 -
            else if (errn != 0)
 
163 -
                *ec_out = make_err(errn);
 
164 -
            else if (is_read_operation() && bytes_transferred == 0)
 
165 -
                *ec_out = capy::error::eof;
 
166 -
            else
 
167 -
                *ec_out = {};
 
168 -
        }
 
169 -

 
170 -
        if (bytes_out)
 
171 -
            *bytes_out = bytes_transferred;
 
172 -

 
173 -
        // Move to stack before destroying the frame
 
174 -
        capy::executor_ref saved_ex(ex);
 
175 -
        std::coroutine_handle<> saved_h(h);
 
176 -
        impl_ptr.reset();
 
177 -
        dispatch_coro(saved_ex, saved_h).resume();
 
178 -
    }
 
179 -

 
180 -
    virtual bool is_read_operation() const noexcept
 
181 -
    {
 
182 -
        return false;
 
183 -
    }
 
184 -
    virtual void cancel() noexcept = 0;
 
185 -

 
186 -
    void destroy() override
 
187 -
    {
 
188 -
        stop_cb.reset();
 
189 -
        impl_ptr.reset();
 
190 -
    }
 
191 -

 
192 -
    void request_cancel() noexcept
 
193 -
    {
 
194 -
        cancelled.store(true, std::memory_order_release);
 
195 -
    }
 
196 -

 
197 -
    void start(std::stop_token const& token)
 
198 -
    {
 
199 -
        cancelled.store(false, std::memory_order_release);
 
200 -
        stop_cb.reset();
 
201 -
        socket_impl_   = nullptr;
 
202 -
        acceptor_impl_ = nullptr;
 
203 -

 
204 -
        if (token.stop_possible())
 
205 -
            stop_cb.emplace(token, canceller{this});
 
206 -
    }
 
207 -

 
208 -
    void start(std::stop_token const& token, select_socket* impl)
 
209 -
    {
 
210 -
        cancelled.store(false, std::memory_order_release);
 
211 -
        stop_cb.reset();
 
212 -
        socket_impl_   = impl;
 
213 -
        acceptor_impl_ = nullptr;
 
214 -

 
215 -
        if (token.stop_possible())
 
216 -
            stop_cb.emplace(token, canceller{this});
 
217 -
    }
 
218 -

 
219 -
    void start(std::stop_token const& token, select_acceptor* impl)
 
220 -
    {
 
221 -
        cancelled.store(false, std::memory_order_release);
 
222 -
        stop_cb.reset();
 
223 -
        socket_impl_   = nullptr;
 
224 -
        acceptor_impl_ = impl;
 
225 -

 
226 -
        if (token.stop_possible())
 
227 -
            stop_cb.emplace(token, canceller{this});
 
228 -
    }
 
229 -

 
230 -
    void complete(int err, std::size_t bytes) noexcept
 
231 -
    {
 
232 -
        errn              = err;
 
233 -
        bytes_transferred = bytes;
 
234 -
    }
 
235 -

 
236 -
    virtual void perform_io() noexcept {}
 
237  
};
60  
};
238  

61  

239 -
struct select_connect_op final : select_op
62 +
/// select connect operation.
 
63 +
struct select_connect_op final : reactor_connect_op<select_op>
240 -
    endpoint target_endpoint;
 
241 -

 
242 -
    void reset() noexcept
 
243 -
    {
 
244 -
        select_op::reset();
 
245 -
        target_endpoint = endpoint{};
 
246 -
    }
 
247 -

 
248 -
    void perform_io() noexcept override
 
249 -
    {
 
250 -
        // connect() completion status is retrieved via SO_ERROR, not return value
 
251 -
        int err       = 0;
 
252 -
        socklen_t len = sizeof(err);
 
253 -
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
254 -
            err = errno;
 
255 -
        complete(err, 0);
 
256 -
    }
 
257 -

 
258 -
    // Defined in sockets.cpp where select_socket is complete
 
259  
{
64  
{
260  
    void operator()() override;
65  
    void operator()() override;
261  
    void cancel() noexcept override;
66  
    void cancel() noexcept override;
262  
};
67  
};
263  

68  

264 -
struct select_read_op final : select_op
69 +
/// select scatter-read operation.
 
70 +
struct select_read_op final : reactor_read_op<select_op>
265 -
    static constexpr std::size_t max_buffers = 16;
 
266 -
    iovec iovecs[max_buffers];
 
267 -
    int iovec_count        = 0;
 
268 -
    bool empty_buffer_read = false;
 
269 -

 
270 -
    bool is_read_operation() const noexcept override
 
271 -
    {
 
272 -
        return !empty_buffer_read;
 
273 -
    }
 
274 -

 
275 -
    void reset() noexcept
 
276 -
    {
 
277 -
        select_op::reset();
 
278 -
        iovec_count       = 0;
 
279 -
        empty_buffer_read = false;
 
280 -
    }
 
281 -

 
282 -
    void perform_io() noexcept override
 
283 -
    {
 
284 -
        ssize_t n = ::readv(fd, iovecs, iovec_count);
 
285 -
        if (n >= 0)
 
286 -
            complete(0, static_cast<std::size_t>(n));
 
287 -
        else
 
288 -
            complete(errno, 0);
 
289 -
    }
 
290 -

 
291  
{
71  
{
292  
    void cancel() noexcept override;
72  
    void cancel() noexcept override;
293  
};
73  
};
294  

74  

295 -
struct select_write_op final : select_op
75 +
/** Provides sendmsg() with EINTR retry for select writes.
296 -
{
 
297 -
    static constexpr std::size_t max_buffers = 16;
 
298 -
    iovec iovecs[max_buffers];
 
299 -
    int iovec_count = 0;
 
300 -

 
301 -
    void reset() noexcept
 
302 -
    {
 
303 -
        select_op::reset();
 
304 -
        iovec_count = 0;
 
305 -
    }
 
306  

76  

307 -
    void perform_io() noexcept override
77 +
    Uses MSG_NOSIGNAL where available (Linux). On platforms without
 
78 +
    it (macOS/BSD), SO_NOSIGPIPE is set at socket creation time
 
79 +
    and flags=0 is used here.
 
80 +
*/
 
81 +
struct select_write_policy
 
82 +
{
 
83 +
    static ssize_t write(int fd, iovec* iovecs, int count) noexcept
308  
    {
84  
    {
309  
        msghdr msg{};
85  
        msghdr msg{};
310  
        msg.msg_iov    = iovecs;
86  
        msg.msg_iov    = iovecs;
311 -
        msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
87 +
        msg.msg_iovlen = static_cast<std::size_t>(count);
312  

88  

313 -
        ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
89 +
#ifdef MSG_NOSIGNAL
314 -
        if (n >= 0)
90 +
        constexpr int send_flags = MSG_NOSIGNAL;
315 -
            complete(0, static_cast<std::size_t>(n));
91 +
#else
316 -
        else
92 +
        constexpr int send_flags = 0;
317 -
            complete(errno, 0);
93 +
#endif
318 -
    }
 
319  

94  

320 -
    void cancel() noexcept override;
95 +
        ssize_t n;
 
96 +
        do
 
97 +
        {
 
98 +
            n = ::sendmsg(fd, &msg, send_flags);
 
99 +
        }
 
100 +
        while (n < 0 && errno == EINTR);
 
101 +
        return n;
 
102 +
    }
321  
};
103  
};
322  

104  

323 -
struct select_accept_op final : select_op
105 +
/// select gather-write operation.
 
106 +
struct select_write_op final : reactor_write_op<select_op, select_write_policy>
324  
{
107  
{
325 -
    int accepted_fd                      = -1;
108 +
    void cancel() noexcept override;
326 -
    io_object::implementation* peer_impl = nullptr;
109 +
};
327 -
    io_object::implementation** impl_out = nullptr;
 
328  

110  

329 -
    void reset() noexcept
111 +
/** Provides accept() + fcntl(O_NONBLOCK|FD_CLOEXEC) with FD_SETSIZE check.
330 -
    {
 
331 -
        select_op::reset();
 
332 -
        accepted_fd = -1;
 
333 -
        peer_impl   = nullptr;
 
334 -
        impl_out    = nullptr;
 
335 -
    }
 
336  

112  

337 -
    void perform_io() noexcept override
113 +
    Uses accept() instead of accept4() for broader POSIX compatibility.
 
114 +
*/
 
115 +
struct select_accept_policy
 
116 +
{
 
117 +
    static int do_accept(int fd, sockaddr_storage& peer) noexcept
338  
    {
118  
    {
339 -
        sockaddr_storage addr_storage{};
119 +
        socklen_t addrlen = sizeof(peer);
340 -
        socklen_t addrlen = sizeof(addr_storage);
120 +
        int new_fd;
 
121 +
        do
 
122 +
        {
 
123 +
            new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&peer), &addrlen);
 
124 +
        }
 
125 +
        while (new_fd < 0 && errno == EINTR);
341  

126  

342 -
        // Note: select backend uses accept() + fcntl instead of accept4()
127 +
        if (new_fd < 0)
343 -
        // for broader POSIX compatibility
128 +
            return new_fd;
344 -
        int new_fd =
 
345 -
            ::accept(fd, reinterpret_cast<sockaddr*>(&addr_storage), &addrlen);
 
346  

129  

347 -
        if (new_fd >= 0)
130 +
        if (new_fd >= FD_SETSIZE)
348  
        {
131  
        {
349 -
            // Reject fds that exceed select()'s FD_SETSIZE limit.
132 +
            ::close(new_fd);
350 -
            // Better to fail now than during later async operations.
133 +
            errno = EINVAL;
351 -
            if (new_fd >= FD_SETSIZE)
134 +
            return -1;
352 -
            {
135 +
        }
353 -
                ::close(new_fd);
 
354 -
                complete(EINVAL, 0);
 
355 -
                return;
 
356 -
            }
 
357 -

 
358 -
            // Set non-blocking and close-on-exec flags.
 
359 -
            // A non-blocking socket is essential for the async reactor;
 
360 -
            // if we can't configure it, fail rather than risk blocking.
 
361 -
            int flags = ::fcntl(new_fd, F_GETFL, 0);
 
362 -
            if (flags == -1)
 
363 -
            {
 
364 -
                int err = errno;
 
365 -
                ::close(new_fd);
 
366 -
                complete(err, 0);
 
367 -
                return;
 
368 -
            }
 
369  

136  

370 -
            if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
137 +
        int flags = ::fcntl(new_fd, F_GETFL, 0);
371 -
            {
138 +
        if (flags == -1)
372 -
                int err = errno;
139 +
        {
373 -
                ::close(new_fd);
140 +
            int err = errno;
374 -
                complete(err, 0);
141 +
            ::close(new_fd);
375 -
                return;
142 +
            errno = err;
376 -
            }
143 +
            return -1;
 
144 +
        }
377  

145  

378 -
            if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
146 +
        if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
379 -
            {
147 +
        {
380 -
                int err = errno;
148 +
            int err = errno;
381 -
                ::close(new_fd);
149 +
            ::close(new_fd);
382 -
                complete(err, 0);
150 +
            errno = err;
383 -
                return;
151 +
            return -1;
384 -
            }
152 +
        }
385  

153  

386 -
            accepted_fd = new_fd;
154 +
        if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
387 -
            complete(0, 0);
155 +
        {
 
156 +
            int err = errno;
 
157 +
            ::close(new_fd);
 
158 +
            errno = err;
 
159 +
            return -1;
388  
        }
160  
        }
389 -
        else
161 +

 
162 +
#ifdef SO_NOSIGPIPE
 
163 +
        int one = 1;
 
164 +
        if (::setsockopt(
 
165 +
                new_fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) == -1)
390  
        {
166  
        {
391 -
            complete(errno, 0);
167 +
            int err = errno;
 
168 +
            ::close(new_fd);
 
169 +
            errno = err;
 
170 +
            return -1;
392  
        }
171  
        }
 
172 +
#endif
 
173 +

 
174 +
        return new_fd;
393  
    }
175  
    }
 
176 +
};
394  

177  

395 -
    // Defined in acceptors.cpp where select_acceptor is complete
178 +
/// select accept operation.
 
179 +
struct select_accept_op final
 
180 +
    : reactor_accept_op<select_op, select_accept_policy>
 
181 +
{
396  
    void operator()() override;
182  
    void operator()() override;
397  
    void cancel() noexcept override;
183  
    void cancel() noexcept override;
398  
};
184  
};
399  

185  

400  
} // namespace boost::corosio::detail
186  
} // namespace boost::corosio::detail
401  

187  

402  
#endif // BOOST_COROSIO_HAS_SELECT
188  
#endif // BOOST_COROSIO_HAS_SELECT
403  

189  

404  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP
190  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_OP_HPP