1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
 
23 +
#include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
23  

24  

24 -
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25 +
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 -
#include <boost/corosio/native/detail/make_err.hpp>
 
26 -
#include <boost/corosio/detail/dispatch_coro.hpp>
 
27 -
#include <boost/corosio/detail/except.hpp>
 
28 -
#include <boost/capy/buffers.hpp>
 
29  

26  

30  
#include <coroutine>
27  
#include <coroutine>
31 -
#include <unordered_map>
 
32  
#include <mutex>
28  
#include <mutex>
33  
#include <utility>
29  
#include <utility>
34  

30  

35  
#include <errno.h>
31  
#include <errno.h>
36  
#include <netinet/in.h>
32  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
33  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
34  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
35  
#include <sys/socket.h>
40  
#include <unistd.h>
36  
#include <unistd.h>
41  

37  

42  
/*
38  
/*
43  
    epoll Socket Implementation
39  
    epoll Socket Implementation
44  
    ===========================
40  
    ===========================
45  

41  

46  
    Each I/O operation follows the same pattern:
42  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
43  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
44  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
45  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

46  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
47  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
48  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
49  
    on fast local connections).
54  

50  

55  
    One-Shot Registration
51  
    One-Shot Registration
56  
    ---------------------
52  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
53  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
54  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
55  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
56  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
57  
    simplicity is worth it.
62  

58  

63  
    Cancellation
59  
    Cancellation
64  
    ------------
60  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
61  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
62  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
63  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
64  
    close_socket() calls cancel() first to ensure this.
69  

65  

70  
    Impl Lifetime with shared_ptr
66  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
67  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
68  
    Socket impls use enable_shared_from_this. The service owns impls via
73 -
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
69 +
    shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
70  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
71  
    ops to the scheduler.
76  

72  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
73  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
74  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
75  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
76  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
77  
    to be destroyed if no other references exist.
82  

78  

83  
    Service Ownership
79  
    Service Ownership
84  
    -----------------
80  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
81  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
82  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
83  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
84  
    in-flight ops will complete and release their refs.
89  
*/
85  
*/
90  

86  

91  
namespace boost::corosio::detail {
87  
namespace boost::corosio::detail {
92  

88  

93 -
/** State for epoll socket service. */
89 +
/// State for epoll socket service.
94 -
class epoll_socket_state
90 +
using epoll_socket_state = reactor_service_state<epoll_scheduler, epoll_socket>;
95 -
{
 
96 -
public:
 
97 -
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
 
98 -
    {
 
99 -
    }
 
100 -

 
101 -
    epoll_scheduler& sched_;
 
102 -
    std::mutex mutex_;
 
103 -
    intrusive_list<epoll_socket> socket_list_;
 
104 -
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
 
105 -
        socket_ptrs_;
 
106 -
};
 
107  

91  

108  
/** epoll socket service implementation.
92  
/** epoll socket service implementation.
109  

93  

110  
    Inherits from socket_service to enable runtime polymorphism.
94  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
95  
    Uses key_type = socket_service for service lookup.
112  
*/
96  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
97  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
98  
{
115  
public:
99  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
100  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
101  
    ~epoll_socket_service() override;
118  

102  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
103  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
104  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

105  

122  
    void shutdown() override;
106  
    void shutdown() override;
123  

107  

124  
    io_object::implementation* construct() override;
108  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
109  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
110  
    void close(io_object::handle&) override;
127  
    std::error_code open_socket(
111  
    std::error_code open_socket(
128  
        tcp_socket::implementation& impl,
112  
        tcp_socket::implementation& impl,
129  
        int family,
113  
        int family,
130  
        int type,
114  
        int type,
131  
        int protocol) override;
115  
        int protocol) override;
132  

116  

133  
    epoll_scheduler& scheduler() const noexcept
117  
    epoll_scheduler& scheduler() const noexcept
134  
    {
118  
    {
135  
        return state_->sched_;
119  
        return state_->sched_;
136  
    }
120  
    }
137 -
    void post(epoll_op* op);
121 +
    void post(scheduler_op* op);
138  
    void work_started() noexcept;
122  
    void work_started() noexcept;
139  
    void work_finished() noexcept;
123  
    void work_finished() noexcept;
140  

124  

141  
private:
125  
private:
142  
    std::unique_ptr<epoll_socket_state> state_;
126  
    std::unique_ptr<epoll_socket_state> state_;
143  
};
127  
};
144 -
//--------------------------------------------------------------------------
 
145 -
//
 
146 -
// Implementation
 
147 -
//
 
148 -
//--------------------------------------------------------------------------
 
149 -

 
150 -
// Register an op with the reactor, handling cached edge events.
 
151 -
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
 
152 -
inline void
 
153 -
epoll_socket::register_op(
 
154 -
    epoll_op& op,
 
155 -
    epoll_op*& desc_slot,
 
156 -
    bool& ready_flag,
 
157 -
    bool& cancel_flag) noexcept
 
158 -
{
 
159 -
    svc_.work_started();
 
160 -

 
161 -
    std::lock_guard lock(desc_state_.mutex);
 
162 -
    bool io_done = false;
 
163 -
    if (ready_flag)
 
164 -
    {
 
165 -
        ready_flag = false;
 
166 -
        op.perform_io();
 
167 -
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
 
168 -
        if (!io_done)
 
169 -
            op.errn = 0;
 
170 -
    }
 
171 -

 
172 -
    if (cancel_flag)
 
173 -
    {
 
174 -
        cancel_flag = false;
 
175 -
        op.cancelled.store(true, std::memory_order_relaxed);
 
176 -
    }
 
177 -

 
178 -
    if (io_done || op.cancelled.load(std::memory_order_acquire))
 
179 -
    {
 
180 -
        svc_.post(&op);
 
181 -
        svc_.work_finished();
 
182 -
    }
 
183 -
    else
 
184 -
    {
 
185 -
        desc_slot = &op;
 
186 -
    }
 
187 -
}
 
188 -

 
189 -
inline void
 
190 -
epoll_op::canceller::operator()() const noexcept
 
191 -
{
 
192 -
    op->cancel();
 
193 -
}
 
194 -

 
195  

128  

196  
inline void
129  
inline void
197  
epoll_connect_op::cancel() noexcept
130  
epoll_connect_op::cancel() noexcept
198  
{
131  
{
199  
    if (socket_impl_)
132  
    if (socket_impl_)
200  
        socket_impl_->cancel_single_op(*this);
133  
        socket_impl_->cancel_single_op(*this);
201  
    else
134  
    else
202  
        request_cancel();
135  
        request_cancel();
203  
}
136  
}
204  

137  

205  
inline void
138  
inline void
206  
epoll_read_op::cancel() noexcept
139  
epoll_read_op::cancel() noexcept
207  
{
140  
{
208  
    if (socket_impl_)
141  
    if (socket_impl_)
209  
        socket_impl_->cancel_single_op(*this);
142  
        socket_impl_->cancel_single_op(*this);
210  
    else
143  
    else
211  
        request_cancel();
144  
        request_cancel();
212  
}
145  
}
213  

146  

214  
inline void
147  
inline void
215  
epoll_write_op::cancel() noexcept
148  
epoll_write_op::cancel() noexcept
216  
{
149  
{
217  
    if (socket_impl_)
150  
    if (socket_impl_)
218  
        socket_impl_->cancel_single_op(*this);
151  
        socket_impl_->cancel_single_op(*this);
219  
    else
152  
    else
220  
        request_cancel();
153  
        request_cancel();
221  
}
154  
}
222  

155  

223  
inline void
156  
inline void
224  
epoll_op::operator()()
157  
epoll_op::operator()()
225  
{
158  
{
226 -
    stop_cb.reset();
159 +
    complete_io_op(*this);
227 -

 
228 -
    socket_impl_->svc_.scheduler().reset_inline_budget();
 
229 -

 
230 -
    if (cancelled.load(std::memory_order_acquire))
 
231 -
        *ec_out = capy::error::canceled;
 
232 -
    else if (errn != 0)
 
233 -
        *ec_out = make_err(errn);
 
234 -
    else if (is_read_operation() && bytes_transferred == 0)
 
235 -
        *ec_out = capy::error::eof;
 
236 -
    else
 
237 -
        *ec_out = {};
 
238 -

 
239 -
    *bytes_out = bytes_transferred;
 
240 -

 
241 -
    // Move to stack before resuming coroutine. The coroutine might close
 
242 -
    // the socket, releasing the last wrapper ref. If impl_ptr were the
 
243 -
    // last ref and we destroyed it while still in operator(), we'd have
 
244 -
    // use-after-free. Moving to local ensures destruction happens at
 
245 -
    // function exit, after all member accesses are complete.
 
246 -
    capy::executor_ref saved_ex(ex);
 
247 -
    std::coroutine_handle<> saved_h(h);
 
248 -
    auto prevent_premature_destruction = std::move(impl_ptr);
 
249 -
    dispatch_coro(saved_ex, saved_h).resume();
 
250  
}
160  
}
251  

161  

252  
inline void
162  
inline void
253  
epoll_connect_op::operator()()
163  
epoll_connect_op::operator()()
254  
{
164  
{
255 -
    stop_cb.reset();
165 +
    complete_connect_op(*this);
256 -

 
257 -
    socket_impl_->svc_.scheduler().reset_inline_budget();
 
258 -

 
259 -
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
 
260 -

 
261 -
    // Cache endpoints on successful connect
 
262 -
    if (success && socket_impl_)
 
263 -
    {
 
264 -
        endpoint local_ep;
 
265 -
        sockaddr_storage local_storage{};
 
266 -
        socklen_t local_len = sizeof(local_storage);
 
267 -
        if (::getsockname(
 
268 -
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
 
269 -
            0)
 
270 -
            local_ep = from_sockaddr(local_storage);
 
271 -
        static_cast<epoll_socket*>(socket_impl_)
 
272 -
            ->set_endpoints(local_ep, target_endpoint);
 
273 -
    }
 
274 -

 
275 -
    if (cancelled.load(std::memory_order_acquire))
 
276 -
        *ec_out = capy::error::canceled;
 
277 -
    else if (errn != 0)
 
278 -
        *ec_out = make_err(errn);
 
279 -
    else
 
280 -
        *ec_out = {};
 
281 -

 
282 -
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
 
283 -
    capy::executor_ref saved_ex(ex);
 
284 -
    std::coroutine_handle<> saved_h(h);
 
285 -
    auto prevent_premature_destruction = std::move(impl_ptr);
 
286 -
    dispatch_coro(saved_ex, saved_h).resume();
 
287  
}
166  
}
288  

167  

289  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
168  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
290 -
    : svc_(svc)
169 +
    : reactor_socket(svc)
291  
{
170  
{
292  
}
171  
}
293  

172  

294  
inline epoll_socket::~epoll_socket() = default;
173  
inline epoll_socket::~epoll_socket() = default;
295  

174  

296  
inline std::coroutine_handle<>
175  
inline std::coroutine_handle<>
297  
epoll_socket::connect(
176  
epoll_socket::connect(
298  
    std::coroutine_handle<> h,
177  
    std::coroutine_handle<> h,
299  
    capy::executor_ref ex,
178  
    capy::executor_ref ex,
300  
    endpoint ep,
179  
    endpoint ep,
301  
    std::stop_token token,
180  
    std::stop_token token,
302  
    std::error_code* ec)
181  
    std::error_code* ec)
303  
{
182  
{
304 -
    auto& op = conn_;
183 +
    return do_connect(h, ex, ep, token, ec);
305 -

 
306 -
    sockaddr_storage storage{};
 
307 -
    socklen_t addrlen =
 
308 -
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
 
309 -
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
 
310 -

 
311 -
    if (result == 0)
 
312 -
    {
 
313 -
        sockaddr_storage local_storage{};
 
314 -
        socklen_t local_len = sizeof(local_storage);
 
315 -
        if (::getsockname(
 
316 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
 
317 -
            0)
 
318 -
            local_endpoint_ = detail::from_sockaddr(local_storage);
 
319 -
        remote_endpoint_ = ep;
 
320 -
    }
 
321 -

 
322 -
    if (result == 0 || errno != EINPROGRESS)
 
323 -
    {
 
324 -
        int err = (result < 0) ? errno : 0;
 
325 -
        if (svc_.scheduler().try_consume_inline_budget())
 
326 -
        {
 
327 -
            *ec = err ? make_err(err) : std::error_code{};
 
328 -
            return dispatch_coro(ex, h);
 
329 -
        }
 
330 -
        op.reset();
 
331 -
        op.h               = h;
 
332 -
        op.ex              = ex;
 
333 -
        op.ec_out          = ec;
 
334 -
        op.fd              = fd_;
 
335 -
        op.target_endpoint = ep;
 
336 -
        op.start(token, this);
 
337 -
        op.impl_ptr = shared_from_this();
 
338 -
        op.complete(err, 0);
 
339 -
        svc_.post(&op);
 
340 -
        return std::noop_coroutine();
 
341 -
    }
 
342 -

 
343 -
    // EINPROGRESS — register with reactor
 
344 -
    op.reset();
 
345 -
    op.h               = h;
 
346 -
    op.ex              = ex;
 
347 -
    op.ec_out          = ec;
 
348 -
    op.fd              = fd_;
 
349 -
    op.target_endpoint = ep;
 
350 -
    op.start(token, this);
 
351 -
    op.impl_ptr = shared_from_this();
 
352 -

 
353 -
    register_op(
 
354 -
        op, desc_state_.connect_op, desc_state_.write_ready,
 
355 -
        desc_state_.connect_cancel_pending);
 
356 -
    return std::noop_coroutine();
 
357  
}
184  
}
358  

185  

359  
inline std::coroutine_handle<>
186  
inline std::coroutine_handle<>
360  
epoll_socket::read_some(
187  
epoll_socket::read_some(
361  
    std::coroutine_handle<> h,
188  
    std::coroutine_handle<> h,
362  
    capy::executor_ref ex,
189  
    capy::executor_ref ex,
363  
    buffer_param param,
190  
    buffer_param param,
364  
    std::stop_token token,
191  
    std::stop_token token,
365  
    std::error_code* ec,
192  
    std::error_code* ec,
366  
    std::size_t* bytes_out)
193  
    std::size_t* bytes_out)
367  
{
194  
{
368 -
    auto& op = rd_;
195 +
    return do_read_some(h, ex, param, token, ec, bytes_out);
369 -
    op.reset();
 
370 -

 
371 -
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
 
372 -
    op.iovec_count =
 
373 -
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
 
374 -

 
375 -
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
376 -
    {
 
377 -
        op.empty_buffer_read = true;
 
378 -
        op.h                 = h;
 
379 -
        op.ex                = ex;
 
380 -
        op.ec_out            = ec;
 
381 -
        op.bytes_out         = bytes_out;
 
382 -
        op.start(token, this);
 
383 -
        op.impl_ptr = shared_from_this();
 
384 -
        op.complete(0, 0);
 
385 -
        svc_.post(&op);
 
386 -
        return std::noop_coroutine();
 
387 -
    }
 
388 -

 
389 -
    for (int i = 0; i < op.iovec_count; ++i)
 
390 -
    {
 
391 -
        op.iovecs[i].iov_base = bufs[i].data();
 
392 -
        op.iovecs[i].iov_len  = bufs[i].size();
 
393 -
    }
 
394 -

 
395 -
    // Speculative read
 
396 -
    ssize_t n;
 
397 -
    do
 
398 -
    {
 
399 -
        n = ::readv(fd_, op.iovecs, op.iovec_count);
 
400 -
    }
 
401 -
    while (n < 0 && errno == EINTR);
 
402 -

 
403 -
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
 
404 -
    {
 
405 -
        int err    = (n < 0) ? errno : 0;
 
406 -
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
 
407 -

 
408 -
        if (svc_.scheduler().try_consume_inline_budget())
 
409 -
        {
 
410 -
            if (err)
 
411 -
                *ec = make_err(err);
 
412 -
            else if (n == 0)
 
413 -
                *ec = capy::error::eof;
 
414 -
            else
 
415 -
                *ec = {};
 
416 -
            *bytes_out = bytes;
 
417 -
            return dispatch_coro(ex, h);
 
418 -
        }
 
419 -
        op.h         = h;
 
420 -
        op.ex        = ex;
 
421 -
        op.ec_out    = ec;
 
422 -
        op.bytes_out = bytes_out;
 
423 -
        op.start(token, this);
 
424 -
        op.impl_ptr = shared_from_this();
 
425 -
        op.complete(err, bytes);
 
426 -
        svc_.post(&op);
 
427 -
        return std::noop_coroutine();
 
428 -
    }
 
429 -

 
430 -
    // EAGAIN — register with reactor
 
431 -
    op.h         = h;
 
432 -
    op.ex        = ex;
 
433 -
    op.ec_out    = ec;
 
434 -
    op.bytes_out = bytes_out;
 
435 -
    op.fd        = fd_;
 
436 -
    op.start(token, this);
 
437 -
    op.impl_ptr = shared_from_this();
 
438 -

 
439 -
    register_op(
 
440 -
        op, desc_state_.read_op, desc_state_.read_ready,
 
441 -
        desc_state_.read_cancel_pending);
 
442 -
    return std::noop_coroutine();
 
443  
}
196  
}
444  

197  

445  
inline std::coroutine_handle<>
198  
inline std::coroutine_handle<>
446  
epoll_socket::write_some(
199  
epoll_socket::write_some(
447  
    std::coroutine_handle<> h,
200  
    std::coroutine_handle<> h,
448  
    capy::executor_ref ex,
201  
    capy::executor_ref ex,
449  
    buffer_param param,
202  
    buffer_param param,
450  
    std::stop_token token,
203  
    std::stop_token token,
451  
    std::error_code* ec,
204  
    std::error_code* ec,
452  
    std::size_t* bytes_out)
205  
    std::size_t* bytes_out)
453  
{
206  
{
454 -
    auto& op = wr_;
207 +
    return do_write_some(h, ex, param, token, ec, bytes_out);
455 -
    op.reset();
 
456 -

 
457 -
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
 
458 -
    op.iovec_count =
 
459 -
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
 
460 -

 
461 -
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
462 -
    {
 
463 -
        op.h         = h;
 
464 -
        op.ex        = ex;
 
465 -
        op.ec_out    = ec;
 
466 -
        op.bytes_out = bytes_out;
 
467 -
        op.start(token, this);
 
468 -
        op.impl_ptr = shared_from_this();
 
469 -
        op.complete(0, 0);
 
470 -
        svc_.post(&op);
 
471 -
        return std::noop_coroutine();
 
472 -
    }
 
473 -

 
474 -
    for (int i = 0; i < op.iovec_count; ++i)
 
475 -
    {
 
476 -
        op.iovecs[i].iov_base = bufs[i].data();
 
477 -
        op.iovecs[i].iov_len  = bufs[i].size();
 
478 -
    }
 
479 -

 
480 -
    // Speculative write
 
481 -
    msghdr msg{};
 
482 -
    msg.msg_iov    = op.iovecs;
 
483 -
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
 
484 -

 
485 -
    ssize_t n;
 
486 -
    do
 
487 -
    {
 
488 -
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
 
489 -
    }
 
490 -
    while (n < 0 && errno == EINTR);
 
491 -

 
492 -
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
 
493 -
    {
 
494 -
        int err    = (n < 0) ? errno : 0;
 
495 -
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
 
496 -

 
497 -
        if (svc_.scheduler().try_consume_inline_budget())
 
498 -
        {
 
499 -
            *ec        = err ? make_err(err) : std::error_code{};
 
500 -
            *bytes_out = bytes;
 
501 -
            return dispatch_coro(ex, h);
 
502 -
        }
 
503 -
        op.h         = h;
 
504 -
        op.ex        = ex;
 
505 -
        op.ec_out    = ec;
 
506 -
        op.bytes_out = bytes_out;
 
507 -
        op.start(token, this);
 
508 -
        op.impl_ptr = shared_from_this();
 
509 -
        op.complete(err, bytes);
 
510 -
        svc_.post(&op);
 
511 -
        return std::noop_coroutine();
 
512 -
    }
 
513 -

 
514 -
    // EAGAIN — register with reactor
 
515 -
    op.h         = h;
 
516 -
    op.ex        = ex;
 
517 -
    op.ec_out    = ec;
 
518 -
    op.bytes_out = bytes_out;
 
519 -
    op.fd        = fd_;
 
520 -
    op.start(token, this);
 
521 -
    op.impl_ptr = shared_from_this();
 
522 -

 
523 -
    register_op(
 
524 -
        op, desc_state_.write_op, desc_state_.write_ready,
 
525 -
        desc_state_.write_cancel_pending);
 
526 -
    return std::noop_coroutine();
 
527 -
}
 
528 -

 
529 -
inline std::error_code
 
530 -
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
 
531 -
{
 
532 -
    int how;
 
533 -
    switch (what)
 
534 -
    {
 
535 -
    case tcp_socket::shutdown_receive:
 
536 -
        how = SHUT_RD;
 
537 -
        break;
 
538 -
    case tcp_socket::shutdown_send:
 
539 -
        how = SHUT_WR;
 
540 -
        break;
 
541 -
    case tcp_socket::shutdown_both:
 
542 -
        how = SHUT_RDWR;
 
543 -
        break;
 
544 -
    default:
 
545 -
        return make_err(EINVAL);
 
546 -
    }
 
547 -
    if (::shutdown(fd_, how) != 0)
 
548 -
        return make_err(errno);
 
549 -
    return {};
 
550 -
}
 
551 -

 
552 -
inline std::error_code
 
553 -
epoll_socket::set_option(
 
554 -
    int level, int optname, void const* data, std::size_t size) noexcept
 
555 -
{
 
556 -
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
 
557 -
        0)
 
558 -
        return make_err(errno);
 
559 -
    return {};
 
560 -
}
 
561 -

 
562 -
inline std::error_code
 
563 -
epoll_socket::get_option(
 
564 -
    int level, int optname, void* data, std::size_t* size) const noexcept
 
565 -
{
 
566 -
    socklen_t len = static_cast<socklen_t>(*size);
 
567 -
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
 
568 -
        return make_err(errno);
 
569 -
    *size = static_cast<std::size_t>(len);
 
570 -
    return {};
 
571  
}
208  
}
572  

209  

573  
inline void
210  
inline void
574  
epoll_socket::cancel() noexcept
211  
epoll_socket::cancel() noexcept
575  
{
212  
{
576 -
    auto self = weak_from_this().lock();
213 +
    do_cancel();
577 -
    if (!self)
 
578 -
        return;
 
579 -

 
580 -
    conn_.request_cancel();
 
581 -
    rd_.request_cancel();
 
582 -
    wr_.request_cancel();
 
583 -

 
584 -
    epoll_op* conn_claimed = nullptr;
 
585 -
    epoll_op* rd_claimed   = nullptr;
 
586 -
    epoll_op* wr_claimed   = nullptr;
 
587 -
    {
 
588 -
        std::lock_guard lock(desc_state_.mutex);
 
589 -
        if (desc_state_.connect_op == &conn_)
 
590 -
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
 
591 -
        else
 
592 -
            desc_state_.connect_cancel_pending = true;
 
593 -
        if (desc_state_.read_op == &rd_)
 
594 -
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
 
595 -
        else
 
596 -
            desc_state_.read_cancel_pending = true;
 
597 -
        if (desc_state_.write_op == &wr_)
 
598 -
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
 
599 -
        else
 
600 -
            desc_state_.write_cancel_pending = true;
 
601 -
    }
 
602 -

 
603 -
    if (conn_claimed)
 
604 -
    {
 
605 -
        conn_.impl_ptr = self;
 
606 -
        svc_.post(&conn_);
 
607 -
        svc_.work_finished();
 
608 -
    }
 
609 -
    if (rd_claimed)
 
610 -
    {
 
611 -
        rd_.impl_ptr = self;
 
612 -
        svc_.post(&rd_);
 
613 -
        svc_.work_finished();
 
614 -
    }
 
615 -
    if (wr_claimed)
 
616 -
    {
 
617 -
        wr_.impl_ptr = self;
 
618 -
        svc_.post(&wr_);
 
619 -
        svc_.work_finished();
 
620 -
    }
 
621 -
}
 
622 -

 
623 -
inline void
 
624 -
epoll_socket::cancel_single_op(epoll_op& op) noexcept
 
625 -
{
 
626 -
    auto self = weak_from_this().lock();
 
627 -
    if (!self)
 
628 -
        return;
 
629 -

 
630 -
    op.request_cancel();
 
631 -

 
632 -
    epoll_op** desc_op_ptr = nullptr;
 
633 -
    if (&op == &conn_)
 
634 -
        desc_op_ptr = &desc_state_.connect_op;
 
635 -
    else if (&op == &rd_)
 
636 -
        desc_op_ptr = &desc_state_.read_op;
 
637 -
    else if (&op == &wr_)
 
638 -
        desc_op_ptr = &desc_state_.write_op;
 
639 -

 
640 -
    if (desc_op_ptr)
 
641 -
    {
 
642 -
        epoll_op* claimed = nullptr;
 
643 -
        {
 
644 -
            std::lock_guard lock(desc_state_.mutex);
 
645 -
            if (*desc_op_ptr == &op)
 
646 -
                claimed = std::exchange(*desc_op_ptr, nullptr);
 
647 -
            else if (&op == &conn_)
 
648 -
                desc_state_.connect_cancel_pending = true;
 
649 -
            else if (&op == &rd_)
 
650 -
                desc_state_.read_cancel_pending = true;
 
651 -
            else if (&op == &wr_)
 
652 -
                desc_state_.write_cancel_pending = true;
 
653 -
        }
 
654 -
        if (claimed)
 
655 -
        {
 
656 -
            op.impl_ptr = self;
 
657 -
            svc_.post(&op);
 
658 -
            svc_.work_finished();
 
659 -
        }
 
660 -
    }
 
661  
}
214  
}
662  

215  

663  
inline void
216  
inline void
664  
epoll_socket::close_socket() noexcept
217  
epoll_socket::close_socket() noexcept
665  
{
218  
{
666 -
    auto self = weak_from_this().lock();
219 +
    do_close_socket();
667 -
    if (self)
 
668 -
    {
 
669 -
        conn_.request_cancel();
 
670 -
        rd_.request_cancel();
 
671 -
        wr_.request_cancel();
 
672 -

 
673 -
        epoll_op* conn_claimed = nullptr;
 
674 -
        epoll_op* rd_claimed   = nullptr;
 
675 -
        epoll_op* wr_claimed   = nullptr;
 
676 -
        {
 
677 -
            std::lock_guard lock(desc_state_.mutex);
 
678 -
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
 
679 -
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
 
680 -
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
 
681 -
            desc_state_.read_ready             = false;
 
682 -
            desc_state_.write_ready            = false;
 
683 -
            desc_state_.read_cancel_pending    = false;
 
684 -
            desc_state_.write_cancel_pending   = false;
 
685 -
            desc_state_.connect_cancel_pending = false;
 
686 -
        }
 
687 -

 
688 -
        if (conn_claimed)
 
689 -
        {
 
690 -
            conn_.impl_ptr = self;
 
691 -
            svc_.post(&conn_);
 
692 -
            svc_.work_finished();
 
693 -
        }
 
694 -
        if (rd_claimed)
 
695 -
        {
 
696 -
            rd_.impl_ptr = self;
 
697 -
            svc_.post(&rd_);
 
698 -
            svc_.work_finished();
 
699 -
        }
 
700 -
        if (wr_claimed)
 
701 -
        {
 
702 -
            wr_.impl_ptr = self;
 
703 -
            svc_.post(&wr_);
 
704 -
            svc_.work_finished();
 
705 -
        }
 
706 -

 
707 -
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
 
708 -
            desc_state_.impl_ref_ = self;
 
709 -
    }
 
710 -

 
711 -
    if (fd_ >= 0)
 
712 -
    {
 
713 -
        if (desc_state_.registered_events != 0)
 
714 -
            svc_.scheduler().deregister_descriptor(fd_);
 
715 -
        ::close(fd_);
 
716 -
        fd_ = -1;
 
717 -
    }
 
718 -

 
719 -
    desc_state_.fd                = -1;
 
720 -
    desc_state_.registered_events = 0;
 
721 -

 
722 -
    local_endpoint_  = endpoint{};
 
723 -
    remote_endpoint_ = endpoint{};
 
724  
}
220  
}
725  

221  

726  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
222  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727  
    : state_(
223  
    : state_(
728  
          std::make_unique<epoll_socket_state>(
224  
          std::make_unique<epoll_socket_state>(
729  
              ctx.use_service<epoll_scheduler>()))
225  
              ctx.use_service<epoll_scheduler>()))
730  
{
226  
{
731  
}
227  
}
732  

228  

733  
inline epoll_socket_service::~epoll_socket_service() {}
229  
inline epoll_socket_service::~epoll_socket_service() {}
734  

230  

735  
inline void
231  
inline void
736  
epoll_socket_service::shutdown()
232  
epoll_socket_service::shutdown()
737  
{
233  
{
738  
    std::lock_guard lock(state_->mutex_);
234  
    std::lock_guard lock(state_->mutex_);
739  

235  

740 -
    while (auto* impl = state_->socket_list_.pop_front())
236 +
    while (auto* impl = state_->impl_list_.pop_front())
741  
        impl->close_socket();
237  
        impl->close_socket();
742  

238  

743 -
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
239 +
    // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
744  
    // drains completed_ops_, calling destroy() on each queued op. If we
240  
    // drains completed_ops_, calling destroy() on each queued op. If we
745  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
241  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
746  
    // last ref to an impl whose embedded descriptor_state is still linked
242  
    // last ref to an impl whose embedded descriptor_state is still linked
747  
    // in the queue — use-after-free on the next pop(). Letting ~state_
243  
    // in the queue — use-after-free on the next pop(). Letting ~state_
748  
    // release the ptrs (during service destruction, after scheduler
244  
    // release the ptrs (during service destruction, after scheduler
749  
    // shutdown) keeps every impl alive until all ops have been drained.
245  
    // shutdown) keeps every impl alive until all ops have been drained.
750  
}
246  
}
751  

247  

752  
inline io_object::implementation*
248  
inline io_object::implementation*
753  
epoll_socket_service::construct()
249  
epoll_socket_service::construct()
754  
{
250  
{
755  
    auto impl = std::make_shared<epoll_socket>(*this);
251  
    auto impl = std::make_shared<epoll_socket>(*this);
756  
    auto* raw = impl.get();
252  
    auto* raw = impl.get();
757  

253  

758  
    {
254  
    {
759  
        std::lock_guard lock(state_->mutex_);
255  
        std::lock_guard lock(state_->mutex_);
760 -
        state_->socket_list_.push_back(raw);
256 +
        state_->impl_ptrs_.emplace(raw, std::move(impl));
761 -
        state_->socket_ptrs_.emplace(raw, std::move(impl));
257 +
        state_->impl_list_.push_back(raw);
762  
    }
258  
    }
763  

259  

764  
    return raw;
260  
    return raw;
765  
}
261  
}
766  

262  

767  
inline void
263  
inline void
768  
epoll_socket_service::destroy(io_object::implementation* impl)
264  
epoll_socket_service::destroy(io_object::implementation* impl)
769  
{
265  
{
770  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
266  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
771  
    epoll_impl->close_socket();
267  
    epoll_impl->close_socket();
772  
    std::lock_guard lock(state_->mutex_);
268  
    std::lock_guard lock(state_->mutex_);
773 -
    state_->socket_list_.remove(epoll_impl);
269 +
    state_->impl_list_.remove(epoll_impl);
774 -
    state_->socket_ptrs_.erase(epoll_impl);
270 +
    state_->impl_ptrs_.erase(epoll_impl);
775  
}
271  
}
776  

272  

777  
inline std::error_code
273  
inline std::error_code
778  
epoll_socket_service::open_socket(
274  
epoll_socket_service::open_socket(
779  
    tcp_socket::implementation& impl, int family, int type, int protocol)
275  
    tcp_socket::implementation& impl, int family, int type, int protocol)
780  
{
276  
{
781  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
277  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
782  
    epoll_impl->close_socket();
278  
    epoll_impl->close_socket();
783  

279  

784  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
280  
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
785  
    if (fd < 0)
281  
    if (fd < 0)
786  
        return make_err(errno);
282  
        return make_err(errno);
787  

283  

788  
    if (family == AF_INET6)
284  
    if (family == AF_INET6)
789  
    {
285  
    {
790  
        int one = 1;
286  
        int one = 1;
791  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
287  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
792  
    }
288  
    }
793  

289  

794  
    epoll_impl->fd_ = fd;
290  
    epoll_impl->fd_ = fd;
795  

291  

796  
    // Register fd with epoll (edge-triggered mode)
292  
    // Register fd with epoll (edge-triggered mode)
797  
    epoll_impl->desc_state_.fd = fd;
293  
    epoll_impl->desc_state_.fd = fd;
798  
    {
294  
    {
799  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
295  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
800  
        epoll_impl->desc_state_.read_op    = nullptr;
296  
        epoll_impl->desc_state_.read_op    = nullptr;
801  
        epoll_impl->desc_state_.write_op   = nullptr;
297  
        epoll_impl->desc_state_.write_op   = nullptr;
802  
        epoll_impl->desc_state_.connect_op = nullptr;
298  
        epoll_impl->desc_state_.connect_op = nullptr;
803  
    }
299  
    }
804  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
300  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
805  

301  

806  
    return {};
302  
    return {};
807  
}
303  
}
808  

304  

809  
inline void
305  
inline void
810  
epoll_socket_service::close(io_object::handle& h)
306  
epoll_socket_service::close(io_object::handle& h)
811  
{
307  
{
812  
    static_cast<epoll_socket*>(h.get())->close_socket();
308  
    static_cast<epoll_socket*>(h.get())->close_socket();
813  
}
309  
}
814  

310  

815  
inline void
311  
inline void
816 -
epoll_socket_service::post(epoll_op* op)
312 +
epoll_socket_service::post(scheduler_op* op)
817  
{
313  
{
818  
    state_->sched_.post(op);
314  
    state_->sched_.post(op);
819  
}
315  
}
820  

316  

821  
inline void
317  
inline void
822  
epoll_socket_service::work_started() noexcept
318  
epoll_socket_service::work_started() noexcept
823  
{
319  
{
824  
    state_->sched_.work_started();
320  
    state_->sched_.work_started();
825  
}
321  
}
826  

322  

827  
inline void
323  
inline void
828  
epoll_socket_service::work_finished() noexcept
324  
epoll_socket_service::work_finished() noexcept
829  
{
325  
{
830  
    state_->sched_.work_finished();
326  
    state_->sched_.work_finished();
831  
}
327  
}
832  

328  

833  
} // namespace boost::corosio::detail
329  
} // namespace boost::corosio::detail
834  

330  

835  
#endif // BOOST_COROSIO_HAS_EPOLL
331  
#endif // BOOST_COROSIO_HAS_EPOLL
836  

332  

837  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
333  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP