1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
 
23 +
#include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
23  

24  

24 -
#include <boost/corosio/native/detail/endpoint_convert.hpp>
25 +
#include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
25 -
#include <boost/corosio/detail/dispatch_coro.hpp>
 
26 -
#include <boost/corosio/native/detail/make_err.hpp>
 
27 -

 
28 -
#include <boost/corosio/detail/except.hpp>
 
29  

26  

30 -
#include <boost/capy/buffers.hpp>
27 +
#include <coroutine>
 
28 +
#include <mutex>
 
29 +
#include <utility>
31  

30  

32  
#include <errno.h>
31  
#include <errno.h>
33  
#include <fcntl.h>
32  
#include <fcntl.h>
34  
#include <netinet/in.h>
33  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
34  
#include <netinet/tcp.h>
 
35 +
#include <sys/select.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38 -
#include <memory>
 
39 -
#include <mutex>
 
40 -
#include <unordered_map>
 
41 -

 
42  

38  

43  
/*
39  
/*
44 -
    select Socket Implementation
40 +
    Each I/O op tries the syscall speculatively; only registers with
45 -
    ============================
41 +
    the reactor on EAGAIN. Fd is registered once at open time and
46 -

42 +
    stays registered until close. The reactor only marks ready_events_;
47 -
    This mirrors the epoll_sockets design for behavioral consistency.
43 +
    actual I/O happens in invoke_deferred_io(). cancel() captures
48 -
    Each I/O operation follows the same pattern:
44 +
    shared_from_this() into op.impl_ptr to keep the impl alive.
49 -
      1. Try the syscall immediately (non-blocking socket)
 
50 -
      2. If it succeeds or fails with a real error, post to completion queue
 
51 -
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
 
52 -

 
53 -
    Cancellation
 
54 -
    ------------
 
55 -
    See op.hpp for the completion/cancellation race handling via the
 
56 -
    `registered` atomic. cancel() must complete pending operations (post
 
57 -
    them with cancelled flag) so coroutines waiting on them can resume.
 
58 -
    close_socket() calls cancel() first to ensure this.
 
59 -

 
60 -
    Impl Lifetime with shared_ptr
 
61 -
    -----------------------------
 
62 -
    Socket impls use enable_shared_from_this. The service owns impls via
 
63 -
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
 
64 -
    removal. When a user calls close(), we call cancel() which posts pending
 
65 -
    ops to the scheduler.
 
66 -

 
67 -
    CRITICAL: The posted ops must keep the impl alive until they complete.
 
68 -
    Otherwise the scheduler would process a freed op (use-after-free). The
 
69 -
    cancel() method captures shared_from_this() into op.impl_ptr before
 
70 -
    posting. When the op completes, impl_ptr is cleared, allowing the impl
 
71 -
    to be destroyed if no other references exist.
 
72 -

 
73 -
    Service Ownership
 
74 -
    -----------------
 
75 -
    select_socket_service owns all socket impls. destroy() removes the
 
76 -
    shared_ptr from the map, but the impl may survive if ops still hold
 
77 -
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
 
78 -
    in-flight ops will complete and release their refs.
 
79  
*/
45  
*/
80  

46  

81  
namespace boost::corosio::detail {
47  
namespace boost::corosio::detail {
82  

48  

83 -
/** State for select socket service. */
49 +
/// State for select socket service.
84 -
class select_socket_state
50 +
using select_socket_state =
85 -
{
51 +
    reactor_service_state<select_scheduler, select_socket>;
86 -
public:
 
87 -
    explicit select_socket_state(select_scheduler& sched) noexcept
 
88 -
        : sched_(sched)
 
89 -
    {
 
90 -
    }
 
91 -

 
92 -
    select_scheduler& sched_;
 
93 -
    std::mutex mutex_;
 
94 -
    intrusive_list<select_socket> socket_list_;
 
95 -
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
 
96 -
        socket_ptrs_;
 
97 -
};
 
98  

52  

99  
/** select socket service implementation.
53  
/** select socket service implementation.
100  

54  

101  
    Inherits from socket_service to enable runtime polymorphism.
55  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
56  
    Uses key_type = socket_service for service lookup.
103  
*/
57  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
58  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
59  
{
106  
public:
60  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
61  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
62  
    ~select_socket_service() override;
109  

63  

110  
    select_socket_service(select_socket_service const&)            = delete;
64  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
65  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

66  

113  
    void shutdown() override;
67  
    void shutdown() override;
114  

68  

115  
    io_object::implementation* construct() override;
69  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
70  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
71  
    void close(io_object::handle&) override;
118  
    std::error_code open_socket(
72  
    std::error_code open_socket(
119  
        tcp_socket::implementation& impl,
73  
        tcp_socket::implementation& impl,
120  
        int family,
74  
        int family,
121  
        int type,
75  
        int type,
122  
        int protocol) override;
76  
        int protocol) override;
123  

77  

124  
    select_scheduler& scheduler() const noexcept
78  
    select_scheduler& scheduler() const noexcept
125  
    {
79  
    {
126  
        return state_->sched_;
80  
        return state_->sched_;
127  
    }
81  
    }
128 -
    void post(select_op* op);
82 +
    void post(scheduler_op* op);
129  
    void work_started() noexcept;
83  
    void work_started() noexcept;
130  
    void work_finished() noexcept;
84  
    void work_finished() noexcept;
131  

85  

132  
private:
86  
private:
133  
    std::unique_ptr<select_socket_state> state_;
87  
    std::unique_ptr<select_socket_state> state_;
134  
};
88  
};
135 -
// Backward compatibility alias
 
136 -
using select_sockets = select_socket_service;
 
137 -

 
138 -
inline void
 
139 -
select_op::canceller::operator()() const noexcept
 
140 -
{
 
141 -
    op->cancel();
 
142 -
}
 
143 -

 
144  

89  

145  
inline void
90  
inline void
146  
select_connect_op::cancel() noexcept
91  
select_connect_op::cancel() noexcept
147  
{
92  
{
148  
    if (socket_impl_)
93  
    if (socket_impl_)
149  
        socket_impl_->cancel_single_op(*this);
94  
        socket_impl_->cancel_single_op(*this);
150  
    else
95  
    else
151  
        request_cancel();
96  
        request_cancel();
152  
}
97  
}
153  

98  

154  
inline void
99  
inline void
155  
select_read_op::cancel() noexcept
100  
select_read_op::cancel() noexcept
156  
{
101  
{
157  
    if (socket_impl_)
102  
    if (socket_impl_)
158  
        socket_impl_->cancel_single_op(*this);
103  
        socket_impl_->cancel_single_op(*this);
159  
    else
104  
    else
160  
        request_cancel();
105  
        request_cancel();
161  
}
106  
}
162  

107  

163  
inline void
108  
inline void
164  
select_write_op::cancel() noexcept
109  
select_write_op::cancel() noexcept
165  
{
110  
{
166  
    if (socket_impl_)
111  
    if (socket_impl_)
167  
        socket_impl_->cancel_single_op(*this);
112  
        socket_impl_->cancel_single_op(*this);
168  
    else
113  
    else
169  
        request_cancel();
114  
        request_cancel();
170  
}
115  
}
171  

116  

172  
inline void
117  
inline void
173 -
select_connect_op::operator()()
118 +
select_op::operator()()
174  
{
119  
{
175 -
    stop_cb.reset();
120 +
    complete_io_op(*this);
176 -

121 +
}
177 -
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
 
178 -

 
179 -
    // Cache endpoints on successful connect
 
180 -
    if (success && socket_impl_)
 
181 -
    {
 
182 -
        endpoint local_ep;
 
183 -
        sockaddr_storage local_storage{};
 
184 -
        socklen_t local_len = sizeof(local_storage);
 
185 -
        if (::getsockname(
 
186 -
                fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
 
187 -
            0)
 
188 -
            local_ep = from_sockaddr(local_storage);
 
189 -
        static_cast<select_socket*>(socket_impl_)
 
190 -
            ->set_endpoints(local_ep, target_endpoint);
 
191 -
    }
 
192 -

 
193 -
    if (ec_out)
 
194 -
    {
 
195 -
        if (cancelled.load(std::memory_order_acquire))
 
196 -
            *ec_out = capy::error::canceled;
 
197 -
        else if (errn != 0)
 
198 -
            *ec_out = make_err(errn);
 
199 -
        else
 
200 -
            *ec_out = {};
 
201 -
    }
 
202 -

 
203 -
    if (bytes_out)
 
204 -
        *bytes_out = bytes_transferred;
 
205  

122  

206 -
    // Move to stack before destroying the frame
123 +
inline void
207 -
    capy::executor_ref saved_ex(ex);
124 +
select_connect_op::operator()()
208 -
    std::coroutine_handle<> saved_h(h);
125 +
{
209 -
    impl_ptr.reset();
126 +
    complete_connect_op(*this);
210 -
    dispatch_coro(saved_ex, saved_h).resume();
 
211  
}
127  
}
212  

128  

213  
inline select_socket::select_socket(select_socket_service& svc) noexcept
129  
inline select_socket::select_socket(select_socket_service& svc) noexcept
214 -
    : svc_(svc)
130 +
    : reactor_socket(svc)
215  
{
131  
{
216  
}
132  
}
217  

133  

 
134 +
inline select_socket::~select_socket() = default;
 
135 +

218  
inline std::coroutine_handle<>
136  
inline std::coroutine_handle<>
219  
select_socket::connect(
137  
select_socket::connect(
220  
    std::coroutine_handle<> h,
138  
    std::coroutine_handle<> h,
221  
    capy::executor_ref ex,
139  
    capy::executor_ref ex,
222  
    endpoint ep,
140  
    endpoint ep,
223  
    std::stop_token token,
141  
    std::stop_token token,
224  
    std::error_code* ec)
142  
    std::error_code* ec)
225  
{
143  
{
226 -
    auto& op = conn_;
144 +
    auto result = do_connect(h, ex, ep, token, ec);
227 -
    op.reset();
145 +
    // Rebuild fd_sets so select() watches for writability
228 -
    op.h               = h;
146 +
    if (result == std::noop_coroutine())
229 -
    op.ex              = ex;
147 +
        svc_.scheduler().notify_reactor();
230 -
    op.ec_out          = ec;
148 +
    return result;
231 -
    op.fd              = fd_;
 
232 -
    op.target_endpoint = ep; // Store target for endpoint caching
 
233 -
    op.start(token, this);
 
234 -

 
235 -
    sockaddr_storage storage{};
 
236 -
    socklen_t addrlen =
 
237 -
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
 
238 -
    int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
 
239 -

 
240 -
    if (result == 0)
 
241 -
    {
 
242 -
        // Sync success — cache endpoints immediately
 
243 -
        sockaddr_storage local_storage{};
 
244 -
        socklen_t local_len = sizeof(local_storage);
 
245 -
        if (::getsockname(
 
246 -
                fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
 
247 -
            0)
 
248 -
            local_endpoint_ = detail::from_sockaddr(local_storage);
 
249 -
        remote_endpoint_ = ep;
 
250 -

 
251 -
        op.complete(0, 0);
 
252 -
        op.impl_ptr = shared_from_this();
 
253 -
        svc_.post(&op);
 
254 -
        // completion is always posted to scheduler queue, never inline.
 
255 -
        return std::noop_coroutine();
 
256 -
    }
 
257 -

 
258 -
    if (errno == EINPROGRESS)
 
259 -
    {
 
260 -
        svc_.work_started();
 
261 -
        op.impl_ptr = shared_from_this();
 
262 -

 
263 -
        // Set registering BEFORE register_fd to close the race window where
 
264 -
        // reactor sees an event before we set registered. The reactor treats
 
265 -
        // registering the same as registered when claiming the op.
 
266 -
        op.registered.store(
 
267 -
            select_registration_state::registering, std::memory_order_release);
 
268 -
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
 
269 -

 
270 -
        // Transition to registered. If this fails, reactor or cancel already
 
271 -
        // claimed the op (state is now unregistered), so we're done. However,
 
272 -
        // we must still deregister the fd because cancel's deregister_fd may
 
273 -
        // have run before our register_fd, leaving the fd orphaned.
 
274 -
        auto expected = select_registration_state::registering;
 
275 -
        if (!op.registered.compare_exchange_strong(
 
276 -
                expected, select_registration_state::registered,
 
277 -
                std::memory_order_acq_rel))
 
278 -
        {
 
279 -
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
 
280 -
            // completion is always posted to scheduler queue, never inline.
 
281 -
            return std::noop_coroutine();
 
282 -
        }
 
283 -

 
284 -
        // If cancelled was set before we registered, handle it now.
 
285 -
        if (op.cancelled.load(std::memory_order_acquire))
 
286 -
        {
 
287 -
            auto prev = op.registered.exchange(
 
288 -
                select_registration_state::unregistered,
 
289 -
                std::memory_order_acq_rel);
 
290 -
            if (prev != select_registration_state::unregistered)
 
291 -
            {
 
292 -
                svc_.scheduler().deregister_fd(
 
293 -
                    fd_, select_scheduler::event_write);
 
294 -
                op.impl_ptr = shared_from_this();
 
295 -
                svc_.post(&op);
 
296 -
                svc_.work_finished();
 
297 -
            }
 
298 -
        }
 
299 -
        // completion is always posted to scheduler queue, never inline.
 
300 -
        return std::noop_coroutine();
 
301 -
    }
 
302 -

 
303 -
    op.complete(errno, 0);
 
304 -
    op.impl_ptr = shared_from_this();
 
305 -
    svc_.post(&op);
 
306 -
    // completion is always posted to scheduler queue, never inline.
 
307 -
    return std::noop_coroutine();
 
308  
}
149  
}
309  

150  

310  
inline std::coroutine_handle<>
151  
inline std::coroutine_handle<>
311  
select_socket::read_some(
152  
select_socket::read_some(
312  
    std::coroutine_handle<> h,
153  
    std::coroutine_handle<> h,
313  
    capy::executor_ref ex,
154  
    capy::executor_ref ex,
314  
    buffer_param param,
155  
    buffer_param param,
315  
    std::stop_token token,
156  
    std::stop_token token,
316  
    std::error_code* ec,
157  
    std::error_code* ec,
317  
    std::size_t* bytes_out)
158  
    std::size_t* bytes_out)
318  
{
159  
{
319 -
    auto& op = rd_;
160 +
    return do_read_some(h, ex, param, token, ec, bytes_out);
320 -
    op.reset();
 
321 -
    op.h         = h;
 
322 -
    op.ex        = ex;
 
323 -
    op.ec_out    = ec;
 
324 -
    op.bytes_out = bytes_out;
 
325 -
    op.fd        = fd_;
 
326 -
    op.start(token, this);
 
327 -

 
328 -
    capy::mutable_buffer bufs[select_read_op::max_buffers];
 
329 -
    op.iovec_count =
 
330 -
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
 
331 -

 
332 -
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
333 -
    {
 
334 -
        op.empty_buffer_read = true;
 
335 -
        op.complete(0, 0);
 
336 -
        op.impl_ptr = shared_from_this();
 
337 -
        svc_.post(&op);
 
338 -
        return std::noop_coroutine();
 
339 -
    }
 
340 -

 
341 -
    for (int i = 0; i < op.iovec_count; ++i)
 
342 -
    {
 
343 -
        op.iovecs[i].iov_base = bufs[i].data();
 
344 -
        op.iovecs[i].iov_len  = bufs[i].size();
 
345 -
    }
 
346 -

 
347 -
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
 
348 -

 
349 -
    if (n > 0)
 
350 -
    {
 
351 -
        op.complete(0, static_cast<std::size_t>(n));
 
352 -
        op.impl_ptr = shared_from_this();
 
353 -
        svc_.post(&op);
 
354 -
        return std::noop_coroutine();
 
355 -
    }
 
356 -

 
357 -
    if (n == 0)
 
358 -
    {
 
359 -
        op.complete(0, 0);
 
360 -
        op.impl_ptr = shared_from_this();
 
361 -
        svc_.post(&op);
 
362 -
        return std::noop_coroutine();
 
363 -
    }
 
364 -

 
365 -
    if (errno == EAGAIN || errno == EWOULDBLOCK)
 
366 -
    {
 
367 -
        svc_.work_started();
 
368 -
        op.impl_ptr = shared_from_this();
 
369 -

 
370 -
        // Set registering BEFORE register_fd to close the race window where
 
371 -
        // reactor sees an event before we set registered.
 
372 -
        op.registered.store(
 
373 -
            select_registration_state::registering, std::memory_order_release);
 
374 -
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
 
375 -

 
376 -
        // Transition to registered. If this fails, reactor or cancel already
 
377 -
        // claimed the op (state is now unregistered), so we're done. However,
 
378 -
        // we must still deregister the fd because cancel's deregister_fd may
 
379 -
        // have run before our register_fd, leaving the fd orphaned.
 
380 -
        auto expected = select_registration_state::registering;
 
381 -
        if (!op.registered.compare_exchange_strong(
 
382 -
                expected, select_registration_state::registered,
 
383 -
                std::memory_order_acq_rel))
 
384 -
        {
 
385 -
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
 
386 -
            return std::noop_coroutine();
 
387 -
        }
 
388 -

 
389 -
        // If cancelled was set before we registered, handle it now.
 
390 -
        if (op.cancelled.load(std::memory_order_acquire))
 
391 -
        {
 
392 -
            auto prev = op.registered.exchange(
 
393 -
                select_registration_state::unregistered,
 
394 -
                std::memory_order_acq_rel);
 
395 -
            if (prev != select_registration_state::unregistered)
 
396 -
            {
 
397 -
                svc_.scheduler().deregister_fd(
 
398 -
                    fd_, select_scheduler::event_read);
 
399 -
                op.impl_ptr = shared_from_this();
 
400 -
                svc_.post(&op);
 
401 -
                svc_.work_finished();
 
402 -
            }
 
403 -
        }
 
404 -
        return std::noop_coroutine();
 
405 -
    }
 
406 -

 
407 -
    op.complete(errno, 0);
 
408 -
    op.impl_ptr = shared_from_this();
 
409 -
    svc_.post(&op);
 
410 -
    return std::noop_coroutine();
 
411  
}
161  
}
412  

162  

413  
inline std::coroutine_handle<>
163  
inline std::coroutine_handle<>
414  
select_socket::write_some(
164  
select_socket::write_some(
415  
    std::coroutine_handle<> h,
165  
    std::coroutine_handle<> h,
416  
    capy::executor_ref ex,
166  
    capy::executor_ref ex,
417  
    buffer_param param,
167  
    buffer_param param,
418  
    std::stop_token token,
168  
    std::stop_token token,
419  
    std::error_code* ec,
169  
    std::error_code* ec,
420  
    std::size_t* bytes_out)
170  
    std::size_t* bytes_out)
421  
{
171  
{
422 -
    auto& op = wr_;
172 +
    auto result = do_write_some(h, ex, param, token, ec, bytes_out);
423 -
    op.reset();
173 +
    // Rebuild fd_sets so select() watches for writability
424 -
    op.h         = h;
174 +
    if (result == std::noop_coroutine())
425 -
    op.ex        = ex;
175 +
        svc_.scheduler().notify_reactor();
426 -
    op.ec_out    = ec;
176 +
    return result;
427 -
    op.bytes_out = bytes_out;
 
428 -
    op.fd        = fd_;
 
429 -
    op.start(token, this);
 
430 -

 
431 -
    capy::mutable_buffer bufs[select_write_op::max_buffers];
 
432 -
    op.iovec_count =
 
433 -
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
 
434 -

 
435 -
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
 
436 -
    {
 
437 -
        op.complete(0, 0);
 
438 -
        op.impl_ptr = shared_from_this();
 
439 -
        svc_.post(&op);
 
440 -
        return std::noop_coroutine();
 
441 -
    }
 
442 -

 
443 -
    for (int i = 0; i < op.iovec_count; ++i)
 
444 -
    {
 
445 -
        op.iovecs[i].iov_base = bufs[i].data();
 
446 -
        op.iovecs[i].iov_len  = bufs[i].size();
 
447 -
    }
 
448 -

 
449 -
    msghdr msg{};
 
450 -
    msg.msg_iov    = op.iovecs;
 
451 -
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
 
452 -

 
453 -
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
 
454 -

 
455 -
    if (n > 0)
 
456 -
    {
 
457 -
        op.complete(0, static_cast<std::size_t>(n));
 
458 -
        op.impl_ptr = shared_from_this();
 
459 -
        svc_.post(&op);
 
460 -
        return std::noop_coroutine();
 
461 -
    }
 
462 -

 
463 -
    if (errno == EAGAIN || errno == EWOULDBLOCK)
 
464 -
    {
 
465 -
        svc_.work_started();
 
466 -
        op.impl_ptr = shared_from_this();
 
467 -

 
468 -
        // Set registering BEFORE register_fd to close the race window where
 
469 -
        // reactor sees an event before we set registered.
 
470 -
        op.registered.store(
 
471 -
            select_registration_state::registering, std::memory_order_release);
 
472 -
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
 
473 -

 
474 -
        // Transition to registered. If this fails, reactor or cancel already
 
475 -
        // claimed the op (state is now unregistered), so we're done. However,
 
476 -
        // we must still deregister the fd because cancel's deregister_fd may
 
477 -
        // have run before our register_fd, leaving the fd orphaned.
 
478 -
        auto expected = select_registration_state::registering;
 
479 -
        if (!op.registered.compare_exchange_strong(
 
480 -
                expected, select_registration_state::registered,
 
481 -
                std::memory_order_acq_rel))
 
482 -
        {
 
483 -
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
 
484 -
            return std::noop_coroutine();
 
485 -
        }
 
486 -

 
487 -
        // If cancelled was set before we registered, handle it now.
 
488 -
        if (op.cancelled.load(std::memory_order_acquire))
 
489 -
        {
 
490 -
            auto prev = op.registered.exchange(
 
491 -
                select_registration_state::unregistered,
 
492 -
                std::memory_order_acq_rel);
 
493 -
            if (prev != select_registration_state::unregistered)
 
494 -
            {
 
495 -
                svc_.scheduler().deregister_fd(
 
496 -
                    fd_, select_scheduler::event_write);
 
497 -
                op.impl_ptr = shared_from_this();
 
498 -
                svc_.post(&op);
 
499 -
                svc_.work_finished();
 
500 -
            }
 
501 -
        }
 
502 -
        return std::noop_coroutine();
 
503 -
    }
 
504 -

 
505 -
    op.complete(errno ? errno : EIO, 0);
 
506 -
    op.impl_ptr = shared_from_this();
 
507 -
    svc_.post(&op);
 
508 -
    return std::noop_coroutine();
 
509 -
}
 
510 -

 
511 -
inline std::error_code
 
512 -
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
 
513 -
{
 
514 -
    int how;
 
515 -
    switch (what)
 
516 -
    {
 
517 -
    case tcp_socket::shutdown_receive:
 
518 -
        how = SHUT_RD;
 
519 -
        break;
 
520 -
    case tcp_socket::shutdown_send:
 
521 -
        how = SHUT_WR;
 
522 -
        break;
 
523 -
    case tcp_socket::shutdown_both:
 
524 -
        how = SHUT_RDWR;
 
525 -
        break;
 
526 -
    default:
 
527 -
        return make_err(EINVAL);
 
528 -
    }
 
529 -
    if (::shutdown(fd_, how) != 0)
 
530 -
        return make_err(errno);
 
531 -
    return {};
 
532 -
}
 
533 -

 
534 -
inline std::error_code
 
535 -
select_socket::set_option(
 
536 -
    int level, int optname, void const* data, std::size_t size) noexcept
 
537 -
{
 
538 -
    if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
 
539 -
        0)
 
540 -
        return make_err(errno);
 
541 -
    return {};
 
542 -
}
 
543 -

 
544 -
inline std::error_code
 
545 -
select_socket::get_option(
 
546 -
    int level, int optname, void* data, std::size_t* size) const noexcept
 
547 -
{
 
548 -
    socklen_t len = static_cast<socklen_t>(*size);
 
549 -
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
 
550 -
        return make_err(errno);
 
551 -
    *size = static_cast<std::size_t>(len);
 
552 -
    return {};
 
553  
}
177  
}
554  

178  

555  
inline void
179  
inline void
556  
select_socket::cancel() noexcept
180  
select_socket::cancel() noexcept
557  
{
181  
{
558 -
    auto self = weak_from_this().lock();
182 +
    do_cancel();
559 -
    if (!self)
 
560 -
        return;
 
561 -

 
562 -
    auto cancel_op = [this, &self](select_op& op, int events) {
 
563 -
        auto prev = op.registered.exchange(
 
564 -
            select_registration_state::unregistered, std::memory_order_acq_rel);
 
565 -
        op.request_cancel();
 
566 -
        if (prev != select_registration_state::unregistered)
 
567 -
        {
 
568 -
            svc_.scheduler().deregister_fd(fd_, events);
 
569 -
            op.impl_ptr = self;
 
570 -
            svc_.post(&op);
 
571 -
            svc_.work_finished();
 
572 -
        }
 
573 -
    };
 
574 -

 
575 -
    cancel_op(conn_, select_scheduler::event_write);
 
576 -
    cancel_op(rd_, select_scheduler::event_read);
 
577 -
    cancel_op(wr_, select_scheduler::event_write);
 
578 -
}
 
579 -

 
580 -
inline void
 
581 -
select_socket::cancel_single_op(select_op& op) noexcept
 
582 -
{
 
583 -
    auto self = weak_from_this().lock();
 
584 -
    if (!self)
 
585 -
        return;
 
586 -

 
587 -
    // Called from stop_token callback to cancel a specific pending operation.
 
588 -
    auto prev = op.registered.exchange(
 
589 -
        select_registration_state::unregistered, std::memory_order_acq_rel);
 
590 -
    op.request_cancel();
 
591 -

 
592 -
    if (prev != select_registration_state::unregistered)
 
593 -
    {
 
594 -
        // Determine which event type to deregister
 
595 -
        int events = 0;
 
596 -
        if (&op == &conn_ || &op == &wr_)
 
597 -
            events = select_scheduler::event_write;
 
598 -
        else if (&op == &rd_)
 
599 -
            events = select_scheduler::event_read;
 
600 -

 
601 -
        svc_.scheduler().deregister_fd(fd_, events);
 
602 -

 
603 -
        op.impl_ptr = self;
 
604 -
        svc_.post(&op);
 
605 -
        svc_.work_finished();
 
606 -
    }
 
607  
}
183  
}
608  

184  

609  
inline void
185  
inline void
610  
select_socket::close_socket() noexcept
186  
select_socket::close_socket() noexcept
611  
{
187  
{
612 -
    auto self = weak_from_this().lock();
188 +
    do_close_socket();
613 -
    if (self)
 
614 -
    {
 
615 -
        auto cancel_op = [this, &self](select_op& op, int events) {
 
616 -
            auto prev = op.registered.exchange(
 
617 -
                select_registration_state::unregistered,
 
618 -
                std::memory_order_acq_rel);
 
619 -
            op.request_cancel();
 
620 -
            if (prev != select_registration_state::unregistered)
 
621 -
            {
 
622 -
                svc_.scheduler().deregister_fd(fd_, events);
 
623 -
                op.impl_ptr = self;
 
624 -
                svc_.post(&op);
 
625 -
                svc_.work_finished();
 
626 -
            }
 
627 -
        };
 
628 -

 
629 -
        cancel_op(conn_, select_scheduler::event_write);
 
630 -
        cancel_op(rd_, select_scheduler::event_read);
 
631 -
        cancel_op(wr_, select_scheduler::event_write);
 
632 -
    }
 
633 -

 
634 -
    if (fd_ >= 0)
 
635 -
    {
 
636 -
        svc_.scheduler().deregister_fd(
 
637 -
            fd_, select_scheduler::event_read | select_scheduler::event_write);
 
638 -
        ::close(fd_);
 
639 -
        fd_ = -1;
 
640 -
    }
 
641 -

 
642 -
    local_endpoint_  = endpoint{};
 
643 -
    remote_endpoint_ = endpoint{};
 
644  
}
189  
}
645  

190  

646  
inline select_socket_service::select_socket_service(
191  
inline select_socket_service::select_socket_service(
647  
    capy::execution_context& ctx)
192  
    capy::execution_context& ctx)
648  
    : state_(
193  
    : state_(
649  
          std::make_unique<select_socket_state>(
194  
          std::make_unique<select_socket_state>(
650  
              ctx.use_service<select_scheduler>()))
195  
              ctx.use_service<select_scheduler>()))
651  
{
196  
{
652  
}
197  
}
653  

198  

654  
inline select_socket_service::~select_socket_service() {}
199  
inline select_socket_service::~select_socket_service() {}
655  

200  

656  
inline void
201  
inline void
657  
select_socket_service::shutdown()
202  
select_socket_service::shutdown()
658  
{
203  
{
659  
    std::lock_guard lock(state_->mutex_);
204  
    std::lock_guard lock(state_->mutex_);
660  

205  

661 -
    while (auto* impl = state_->socket_list_.pop_front())
206 +
    while (auto* impl = state_->impl_list_.pop_front())
662  
        impl->close_socket();
207  
        impl->close_socket();
663  

208  

664 -
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
209 +
    // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
665  
    // drains completed_ops_, calling destroy() on each queued op. Letting
210  
    // drains completed_ops_, calling destroy() on each queued op. Letting
666  
    // ~state_ release the ptrs (during service destruction, after scheduler
211  
    // ~state_ release the ptrs (during service destruction, after scheduler
667  
    // shutdown) keeps every impl alive until all ops have been drained.
212  
    // shutdown) keeps every impl alive until all ops have been drained.
668  
}
213  
}
669  

214  

670  
inline io_object::implementation*
215  
inline io_object::implementation*
671  
select_socket_service::construct()
216  
select_socket_service::construct()
672  
{
217  
{
673  
    auto impl = std::make_shared<select_socket>(*this);
218  
    auto impl = std::make_shared<select_socket>(*this);
674  
    auto* raw = impl.get();
219  
    auto* raw = impl.get();
675  

220  

676  
    {
221  
    {
677  
        std::lock_guard lock(state_->mutex_);
222  
        std::lock_guard lock(state_->mutex_);
678 -
        state_->socket_list_.push_back(raw);
223 +
        state_->impl_ptrs_.emplace(raw, std::move(impl));
679 -
        state_->socket_ptrs_.emplace(raw, std::move(impl));
224 +
        state_->impl_list_.push_back(raw);
680  
    }
225  
    }
681  

226  

682  
    return raw;
227  
    return raw;
683  
}
228  
}
684  

229  

685  
inline void
230  
inline void
686  
select_socket_service::destroy(io_object::implementation* impl)
231  
select_socket_service::destroy(io_object::implementation* impl)
687  
{
232  
{
688  
    auto* select_impl = static_cast<select_socket*>(impl);
233  
    auto* select_impl = static_cast<select_socket*>(impl);
689  
    select_impl->close_socket();
234  
    select_impl->close_socket();
690  
    std::lock_guard lock(state_->mutex_);
235  
    std::lock_guard lock(state_->mutex_);
691 -
    state_->socket_list_.remove(select_impl);
236 +
    state_->impl_list_.remove(select_impl);
692 -
    state_->socket_ptrs_.erase(select_impl);
237 +
    state_->impl_ptrs_.erase(select_impl);
693  
}
238  
}
694  

239  

695  
inline std::error_code
240  
inline std::error_code
696  
select_socket_service::open_socket(
241  
select_socket_service::open_socket(
697  
    tcp_socket::implementation& impl, int family, int type, int protocol)
242  
    tcp_socket::implementation& impl, int family, int type, int protocol)
698  
{
243  
{
699  
    auto* select_impl = static_cast<select_socket*>(&impl);
244  
    auto* select_impl = static_cast<select_socket*>(&impl);
700  
    select_impl->close_socket();
245  
    select_impl->close_socket();
701  

246  

702  
    int fd = ::socket(family, type, protocol);
247  
    int fd = ::socket(family, type, protocol);
703  
    if (fd < 0)
248  
    if (fd < 0)
704  
        return make_err(errno);
249  
        return make_err(errno);
705  

250  

706  
    if (family == AF_INET6)
251  
    if (family == AF_INET6)
707  
    {
252  
    {
708  
        int one = 1;
253  
        int one = 1;
709  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
254  
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710  
    }
255  
    }
711 -
    // Set non-blocking and close-on-exec
 
712  

256  

713  
    int flags = ::fcntl(fd, F_GETFL, 0);
257  
    int flags = ::fcntl(fd, F_GETFL, 0);
714  
    if (flags == -1)
258  
    if (flags == -1)
715  
    {
259  
    {
716  
        int errn = errno;
260  
        int errn = errno;
717  
        ::close(fd);
261  
        ::close(fd);
718  
        return make_err(errn);
262  
        return make_err(errn);
719  
    }
263  
    }
720  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
264  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721  
    {
265  
    {
722  
        int errn = errno;
266  
        int errn = errno;
723  
        ::close(fd);
267  
        ::close(fd);
724  
        return make_err(errn);
268  
        return make_err(errn);
725  
    }
269  
    }
726  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
270  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727  
    {
271  
    {
728  
        int errn = errno;
272  
        int errn = errno;
729  
        ::close(fd);
273  
        ::close(fd);
730  
        return make_err(errn);
274  
        return make_err(errn);
731  
    }
275  
    }
732 -
    // Check fd is within select() limits
 
733  

276  

734  
    if (fd >= FD_SETSIZE)
277  
    if (fd >= FD_SETSIZE)
735  
    {
278  
    {
736  
        ::close(fd);
279  
        ::close(fd);
737 -
        return make_err(EMFILE); // Too many open files
280 +
        return make_err(EMFILE);
738  
    }
281  
    }
739  

282  

 
283 +
#ifdef SO_NOSIGPIPE
 
284 +
    {
 
285 +
        int one = 1;
 
286 +
        ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
 
287 +
    }
 
288 +
#endif
 
289 +

740  
    select_impl->fd_ = fd;
290  
    select_impl->fd_ = fd;
 
291 +

 
292 +
    select_impl->desc_state_.fd = fd;
 
293 +
    {
 
294 +
        std::lock_guard lock(select_impl->desc_state_.mutex);
 
295 +
        select_impl->desc_state_.read_op    = nullptr;
 
296 +
        select_impl->desc_state_.write_op   = nullptr;
 
297 +
        select_impl->desc_state_.connect_op = nullptr;
 
298 +
    }
 
299 +
    scheduler().register_descriptor(fd, &select_impl->desc_state_);
 
300 +

741  
    return {};
301  
    return {};
742  
}
302  
}
743  

303  

744  
inline void
304  
inline void
745  
select_socket_service::close(io_object::handle& h)
305  
select_socket_service::close(io_object::handle& h)
746  
{
306  
{
747  
    static_cast<select_socket*>(h.get())->close_socket();
307  
    static_cast<select_socket*>(h.get())->close_socket();
748  
}
308  
}
749  

309  

750  
inline void
310  
inline void
751 -
select_socket_service::post(select_op* op)
311 +
select_socket_service::post(scheduler_op* op)
752  
{
312  
{
753  
    state_->sched_.post(op);
313  
    state_->sched_.post(op);
754  
}
314  
}
755  

315  

756  
inline void
316  
inline void
757  
select_socket_service::work_started() noexcept
317  
select_socket_service::work_started() noexcept
758  
{
318  
{
759  
    state_->sched_.work_started();
319  
    state_->sched_.work_started();
760  
}
320  
}
761  

321  

762  
inline void
322  
inline void
763  
select_socket_service::work_finished() noexcept
323  
select_socket_service::work_finished() noexcept
764  
{
324  
{
765  
    state_->sched_.work_finished();
325  
    state_->sched_.work_finished();
766  
}
326  
}
767  

327  

768  
} // namespace boost::corosio::detail
328  
} // namespace boost::corosio::detail
769  

329  

770  
#endif // BOOST_COROSIO_HAS_SELECT
330  
#endif // BOOST_COROSIO_HAS_SELECT
771  

331  

772  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
332  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP