1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17 -
#include <boost/corosio/detail/config.hpp>
17 +
#include <boost/corosio/native/detail/reactor/reactor_op.hpp>
18 -
#include <boost/corosio/io/io_object.hpp>
18 +
#include <boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp>
19 -
#include <boost/corosio/endpoint.hpp>
 
20 -
#include <boost/capy/ex/executor_ref.hpp>
 
21 -
#include <coroutine>
 
22 -
#include <boost/capy/error.hpp>
 
23 -
#include <system_error>
 
24 -

 
25 -
#include <boost/corosio/native/detail/make_err.hpp>
 
26 -
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 -
#include <boost/corosio/detail/scheduler_op.hpp>
 
28 -
#include <boost/corosio/native/detail/endpoint_convert.hpp>
 
29 -

 
30 -
#include <unistd.h>
 
31 -
#include <errno.h>
 
32 -

 
33 -
#include <atomic>
 
34 -
#include <cstddef>
 
35 -
#include <memory>
 
36 -
#include <mutex>
 
37 -
#include <optional>
 
38 -
#include <stop_token>
 
39 -

 
40 -
#include <netinet/in.h>
 
41 -
#include <sys/socket.h>
 
42 -
#include <sys/uio.h>
 
43  

19  

44  
/*
20  
/*
45  
    epoll Operation State
21  
    epoll Operation State
46  
    =====================
22  
    =====================
47  

23  

48  
    Each async I/O operation has a corresponding epoll_op-derived struct that
24  
    Each async I/O operation has a corresponding epoll_op-derived struct that
49  
    holds the operation's state while it's in flight. The socket impl owns
25  
    holds the operation's state while it's in flight. The socket impl owns
50  
    fixed slots for each operation type (conn_, rd_, wr_), so only one
26  
    fixed slots for each operation type (conn_, rd_, wr_), so only one
51  
    operation of each type can be pending per socket at a time.
27  
    operation of each type can be pending per socket at a time.
52  

28  

53  
    Persistent Registration
29  
    Persistent Registration
54  
    -----------------------
30  
    -----------------------
55  
    File descriptors are registered with epoll once (via descriptor_state) and
31  
    File descriptors are registered with epoll once (via descriptor_state) and
56  
    stay registered until closed. The descriptor_state tracks which operations
32  
    stay registered until closed. The descriptor_state tracks which operations
57  
    are pending (read_op, write_op, connect_op). When an event arrives, the
33  
    are pending (read_op, write_op, connect_op). When an event arrives, the
58  
    reactor dispatches to the appropriate pending operation.
34  
    reactor dispatches to the appropriate pending operation.
59  

35  

60  
    Impl Lifetime Management
36  
    Impl Lifetime Management
61  
    ------------------------
37  
    ------------------------
62  
    When cancel() posts an op to the scheduler's ready queue, the socket impl
38  
    When cancel() posts an op to the scheduler's ready queue, the socket impl
63  
    might be destroyed before the scheduler processes the op. The `impl_ptr`
39  
    might be destroyed before the scheduler processes the op. The `impl_ptr`
64  
    member holds a shared_ptr to the impl, keeping it alive until the op
40  
    member holds a shared_ptr to the impl, keeping it alive until the op
65  
    completes. This is set by cancel() and cleared in operator() after the
41  
    completes. This is set by cancel() and cleared in operator() after the
66  
    coroutine is resumed.
42  
    coroutine is resumed.
67  

43  

68  
    EOF Detection
44  
    EOF Detection
69  
    -------------
45  
    -------------
70  
    For reads, 0 bytes with no error means EOF. But an empty user buffer also
46  
    For reads, 0 bytes with no error means EOF. But an empty user buffer also
71  
    returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
47  
    returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72  

48  

73  
    SIGPIPE Prevention
49  
    SIGPIPE Prevention
74  
    ------------------
50  
    ------------------
75  
    Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
51  
    Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76  
    SIGPIPE when the peer has closed.
52  
    SIGPIPE when the peer has closed.
77  
*/
53  
*/
78  

54  

79  
namespace boost::corosio::detail {
55  
namespace boost::corosio::detail {
80  

56  

81  
// Forward declarations
57  
// Forward declarations
82  
class epoll_socket;
58  
class epoll_socket;
83  
class epoll_acceptor;
59  
class epoll_acceptor;
84  
struct epoll_op;
60  
struct epoll_op;
85  

61  

86  
// Forward declaration
62  
// Forward declaration
87  
class epoll_scheduler;
63  
class epoll_scheduler;
88  

64  

89 -
/** Per-descriptor state for persistent epoll registration.
65 +
/// Per-descriptor state for persistent epoll registration.
90 -

66 +
struct descriptor_state final : reactor_descriptor_state
91 -
    Tracks pending operations for a file descriptor. The fd is registered
67 +
{};
92 -
    once with epoll and stays registered until closed.
 
93 -

 
94 -
    This struct extends scheduler_op to support deferred I/O processing.
 
95 -
    When epoll events arrive, the reactor sets ready_events and queues
 
96 -
    this descriptor for processing. When popped from the scheduler queue,
 
97 -
    operator() performs the actual I/O and queues completion handlers.
 
98 -

 
99 -
    @par Deferred I/O Model
 
100 -
    The reactor no longer performs I/O directly. Instead:
 
101 -
    1. Reactor sets ready_events and queues descriptor_state
 
102 -
    2. Scheduler pops descriptor_state and calls operator()
 
103 -
    3. operator() performs I/O under mutex and queues completions
 
104 -

 
105 -
    This eliminates per-descriptor mutex locking from the reactor hot path.
 
106 -

 
107 -
    @par Thread Safety
 
108 -
    The mutex protects operation pointers and ready flags during I/O.
 
109 -
    ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
 
110 -
*/
 
111 -
struct descriptor_state final : scheduler_op
 
112 -
{
 
113 -
    std::mutex mutex;
 
114 -

 
115 -
    // Protected by mutex
 
116 -
    epoll_op* read_op    = nullptr;
 
117 -
    epoll_op* write_op   = nullptr;
 
118 -
    epoll_op* connect_op = nullptr;
 
119 -

 
120 -
    // Caches edge events that arrived before an op was registered
 
121 -
    bool read_ready  = false;
 
122 -
    bool write_ready = false;
 
123 -

 
124 -
    // Deferred cancellation: set by cancel() when the target op is not
 
125 -
    // parked (e.g. completing inline via speculative I/O). Checked when
 
126 -
    // the next op parks; if set, the op is immediately self-cancelled.
 
127 -
    // This matches IOCP semantics where CancelIoEx always succeeds.
 
128 -
    bool read_cancel_pending    = false;
 
129 -
    bool write_cancel_pending   = false;
 
130 -
    bool connect_cancel_pending = false;
 
131 -

 
132 -
    // Set during registration only (no mutex needed)
 
133 -
    std::uint32_t registered_events = 0;
 
134 -
    int fd                          = -1;
 
135 -

 
136 -
    // For deferred I/O - set by reactor, read by scheduler
 
137 -
    std::atomic<std::uint32_t> ready_events_{0};
 
138 -
    std::atomic<bool> is_enqueued_{false};
 
139 -
    epoll_scheduler const* scheduler_ = nullptr;
 
140 -

 
141 -
    // Prevents impl destruction while this descriptor_state is queued.
 
142 -
    // Set by close_socket() when is_enqueued_ is true, cleared by operator().
 
143 -
    std::shared_ptr<void> impl_ref_;
 
144 -

 
145 -
    /// Add ready events atomically.
 
146 -
    void add_ready_events(std::uint32_t ev) noexcept
 
147 -
    {
 
148 -
        ready_events_.fetch_or(ev, std::memory_order_relaxed);
 
149 -
    }
 
150 -

 
151 -
    /// Perform deferred I/O and queue completions.
 
152 -
    void operator()() override;
 
153 -

 
154 -
    /// Destroy without invoking.
 
155 -
    /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
 
156 -
    /// the self-referential cycle set by close_socket().
 
157 -
    void destroy() override
 
158 -
    {
 
159 -
        impl_ref_.reset();
 
160 -
    }
 
161 -
};
 
162  

68  

163 -
struct epoll_op : scheduler_op
69 +
/// epoll base operation — thin wrapper over reactor_op.
 
70 +
struct epoll_op : reactor_op<epoll_socket, epoll_acceptor>
164 -
    struct canceller
 
165 -
    {
 
166 -
        epoll_op* op;
 
167 -
        void operator()() const noexcept;
 
168 -
    };
 
169 -

 
170 -
    std::coroutine_handle<> h;
 
171 -
    capy::executor_ref ex;
 
172 -
    std::error_code* ec_out = nullptr;
 
173 -
    std::size_t* bytes_out  = nullptr;
 
174 -

 
175 -
    int fd                        = -1;
 
176 -
    int errn                      = 0;
 
177 -
    std::size_t bytes_transferred = 0;
 
178 -

 
179 -
    std::atomic<bool> cancelled{false};
 
180 -
    std::optional<std::stop_callback<canceller>> stop_cb;
 
181 -

 
182 -
    // Prevents use-after-free when socket is closed with pending ops.
 
183 -
    // See "Impl Lifetime Management" in file header.
 
184 -
    std::shared_ptr<void> impl_ptr;
 
185 -

 
186 -
    // For stop_token cancellation - pointer to owning socket/acceptor impl.
 
187 -
    // When stop is requested, we call back to the impl to perform actual I/O cancellation.
 
188 -
    epoll_socket* socket_impl_     = nullptr;
 
189 -
    epoll_acceptor* acceptor_impl_ = nullptr;
 
190 -

 
191 -
    epoll_op() = default;
 
192 -

 
193 -
    void reset() noexcept
 
194 -
    {
 
195 -
        fd                = -1;
 
196 -
        errn              = 0;
 
197 -
        bytes_transferred = 0;
 
198 -
        cancelled.store(false, std::memory_order_relaxed);
 
199 -
        impl_ptr.reset();
 
200 -
        socket_impl_   = nullptr;
 
201 -
        acceptor_impl_ = nullptr;
 
202 -
    }
 
203 -

 
204 -
    // Defined in sockets.cpp where epoll_socket is complete
 
205  
{
71  
{
206 -

 
207 -
    virtual bool is_read_operation() const noexcept
 
208 -
    {
 
209 -
        return false;
 
210 -
    }
 
211 -
    virtual void cancel() noexcept = 0;
 
212 -

 
213 -
    void destroy() override
 
214 -
    {
 
215 -
        stop_cb.reset();
 
216 -
        impl_ptr.reset();
 
217 -
    }
 
218 -

 
219 -
    void request_cancel() noexcept
 
220 -
    {
 
221 -
        cancelled.store(true, std::memory_order_release);
 
222 -
    }
 
223 -

 
224 -
    void start(std::stop_token const& token, epoll_socket* impl)
 
225 -
    {
 
226 -
        cancelled.store(false, std::memory_order_release);
 
227 -
        stop_cb.reset();
 
228 -
        socket_impl_   = impl;
 
229 -
        acceptor_impl_ = nullptr;
 
230 -

 
231 -
        if (token.stop_possible())
 
232 -
            stop_cb.emplace(token, canceller{this});
 
233 -
    }
 
234 -

 
235 -
    void start(std::stop_token const& token, epoll_acceptor* impl)
 
236 -
    {
 
237 -
        cancelled.store(false, std::memory_order_release);
 
238 -
        stop_cb.reset();
 
239 -
        socket_impl_   = nullptr;
 
240 -
        acceptor_impl_ = impl;
 
241 -

 
242 -
        if (token.stop_possible())
 
243 -
            stop_cb.emplace(token, canceller{this});
 
244 -
    }
 
245 -

 
246 -
    void complete(int err, std::size_t bytes) noexcept
 
247 -
    {
 
248 -
        errn              = err;
 
249 -
        bytes_transferred = bytes;
 
250 -
    }
 
251 -

 
252 -
    virtual void perform_io() noexcept {}
 
253  
    void operator()() override;
72  
    void operator()() override;
254  
};
73  
};
255  

74  

256 -
struct epoll_connect_op final : epoll_op
75 +
/// epoll connect operation.
 
76 +
struct epoll_connect_op final : reactor_connect_op<epoll_op>
257 -
    endpoint target_endpoint;
 
258 -

 
259 -
    void reset() noexcept
 
260 -
    {
 
261 -
        epoll_op::reset();
 
262 -
        target_endpoint = endpoint{};
 
263 -
    }
 
264 -

 
265 -
    void perform_io() noexcept override
 
266 -
    {
 
267 -
        // connect() completion status is retrieved via SO_ERROR, not return value
 
268 -
        int err       = 0;
 
269 -
        socklen_t len = sizeof(err);
 
270 -
        if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
 
271 -
            err = errno;
 
272 -
        complete(err, 0);
 
273 -
    }
 
274 -

 
275 -
    // Defined in sockets.cpp where epoll_socket is complete
 
276  
{
77  
{
277  
    void operator()() override;
78  
    void operator()() override;
278  
    void cancel() noexcept override;
79  
    void cancel() noexcept override;
279  
};
80  
};
280  

81  

281 -
struct epoll_read_op final : epoll_op
82 +
/// epoll scatter-read operation.
 
83 +
struct epoll_read_op final : reactor_read_op<epoll_op>
282 -
    static constexpr std::size_t max_buffers = 16;
 
283 -
    iovec iovecs[max_buffers];
 
284 -
    int iovec_count        = 0;
 
285 -
    bool empty_buffer_read = false;
 
286 -

 
287 -
    bool is_read_operation() const noexcept override
 
288 -
    {
 
289 -
        return !empty_buffer_read;
 
290 -
    }
 
291 -

 
292 -
    void reset() noexcept
 
293 -
    {
 
294 -
        epoll_op::reset();
 
295 -
        iovec_count       = 0;
 
296 -
        empty_buffer_read = false;
 
297 -
    }
 
298 -

 
299 -
    void perform_io() noexcept override
 
300 -
    {
 
301 -
        ssize_t n;
 
302 -
        do
 
303 -
        {
 
304 -
            n = ::readv(fd, iovecs, iovec_count);
 
305 -
        }
 
306 -
        while (n < 0 && errno == EINTR);
 
307 -

 
308 -
        if (n >= 0)
 
309 -
            complete(0, static_cast<std::size_t>(n));
 
310 -
        else
 
311 -
            complete(errno, 0);
 
312 -
    }
 
313 -

 
314  
{
84  
{
315  
    void cancel() noexcept override;
85  
    void cancel() noexcept override;
316  
};
86  
};
317  

87  

318 -
struct epoll_write_op final : epoll_op
88 +
/** Provides sendmsg(MSG_NOSIGNAL) with EINTR retry for epoll writes. */
 
89 +
struct epoll_write_policy
319  
{
90  
{
320 -
    static constexpr std::size_t max_buffers = 16;
91 +
    static ssize_t write(int fd, iovec* iovecs, int count) noexcept
321 -
    iovec iovecs[max_buffers];
 
322 -
    int iovec_count = 0;
 
323 -

 
324 -
    void reset() noexcept
 
325 -
    {
 
326 -
        epoll_op::reset();
 
327 -
        iovec_count = 0;
 
328 -
    }
 
329 -

 
330 -
    void perform_io() noexcept override
 
331  
    {
92  
    {
332  
        msghdr msg{};
93  
        msghdr msg{};
333  
        msg.msg_iov    = iovecs;
94  
        msg.msg_iov    = iovecs;
334 -
        msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
95 +
        msg.msg_iovlen = static_cast<std::size_t>(count);
335  

96  

336  
        ssize_t n;
97  
        ssize_t n;
337  
        do
98  
        do
338  
        {
99  
        {
339  
            n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
100  
            n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
340  
        }
101  
        }
341  
        while (n < 0 && errno == EINTR);
102  
        while (n < 0 && errno == EINTR);
342 -

103 +
        return n;
343 -
        if (n >= 0)
 
344 -
            complete(0, static_cast<std::size_t>(n));
 
345 -
        else
 
346 -
            complete(errno, 0);
 
347  
    }
104  
    }
 
105 +
};
348  

106  

 
107 +
/// epoll gather-write operation.
 
108 +
struct epoll_write_op final : reactor_write_op<epoll_op, epoll_write_policy>
 
109 +
{
349  
    void cancel() noexcept override;
110  
    void cancel() noexcept override;
350  
};
111  
};
351  

112  

352 -
struct epoll_accept_op final : epoll_op
113 +
/** Provides accept4(SOCK_NONBLOCK|SOCK_CLOEXEC) with EINTR retry. */
 
114 +
struct epoll_accept_policy
353  
{
115  
{
354 -
    int accepted_fd                      = -1;
116 +
    static int do_accept(int fd, sockaddr_storage& peer) noexcept
355 -
    io_object::implementation** impl_out = nullptr;
 
356 -
    sockaddr_storage peer_storage{};
 
357 -

 
358 -
    void reset() noexcept
 
359 -
    {
 
360 -
        epoll_op::reset();
 
361 -
        accepted_fd  = -1;
 
362 -
        impl_out     = nullptr;
 
363 -
        peer_storage = {};
 
364 -
    }
 
365 -

 
366 -
    void perform_io() noexcept override
 
367  
    {
117  
    {
368 -
        socklen_t addrlen = sizeof(peer_storage);
118 +
        socklen_t addrlen = sizeof(peer);
369  
        int new_fd;
119  
        int new_fd;
370  
        do
120  
        do
371  
        {
121  
        {
372  
            new_fd = ::accept4(
122  
            new_fd = ::accept4(
373 -
                fd, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
123 +
                fd, reinterpret_cast<sockaddr*>(&peer), &addrlen,
374  
                SOCK_NONBLOCK | SOCK_CLOEXEC);
124  
                SOCK_NONBLOCK | SOCK_CLOEXEC);
375  
        }
125  
        }
376  
        while (new_fd < 0 && errno == EINTR);
126  
        while (new_fd < 0 && errno == EINTR);
377 -

127 +
        return new_fd;
378 -
        if (new_fd >= 0)
 
379 -
        {
 
380 -
            accepted_fd = new_fd;
 
381 -
            complete(0, 0);
 
382 -
        }
 
383 -
        else
 
384 -
        {
 
385 -
            complete(errno, 0);
 
386 -
        }
 
387  
    }
128  
    }
 
129 +
};
388  

130  

389 -
    // Defined in acceptors.cpp where epoll_acceptor is complete
131 +
/// epoll accept operation.
 
132 +
struct epoll_accept_op final : reactor_accept_op<epoll_op, epoll_accept_policy>
 
133 +
{
390  
    void operator()() override;
134  
    void operator()() override;
391  
    void cancel() noexcept override;
135  
    void cancel() noexcept override;
392  
};
136  
};
393  

137  

394  
} // namespace boost::corosio::detail
138  
} // namespace boost::corosio::detail
395  

139  

396  
#endif // BOOST_COROSIO_HAS_EPOLL
140  
#endif // BOOST_COROSIO_HAS_EPOLL
397  

141  

398  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP
142  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_OP_HPP