LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 79.7 % 118 94 24
Test Date: 2026-03-13 23:11:04 Functions: 88.0 % 25 22 3

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_socket.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      23                 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
      24                 : 
      25                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      26                 : 
      27                 : #include <coroutine>
      28                 : #include <mutex>
      29                 : #include <utility>
      30                 : 
      31                 : #include <errno.h>
      32                 : #include <fcntl.h>
      33                 : #include <netinet/in.h>
      34                 : #include <netinet/tcp.h>
      35                 : #include <sys/select.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : /*
      40                 :     Each I/O op tries the syscall speculatively; only registers with
      41                 :     the reactor on EAGAIN. Fd is registered once at open time and
      42                 :     stays registered until close. The reactor only marks ready_events_;
      43                 :     actual I/O happens in invoke_deferred_io(). cancel() captures
      44                 :     shared_from_this() into op.impl_ptr to keep the impl alive.
      45                 : */
      46                 : 
      47                 : namespace boost::corosio::detail {
      48                 : 
      49                 : /// State for select socket service.
      50                 : using select_socket_state =
      51                 :     reactor_service_state<select_scheduler, select_socket>;
      52                 : 
      53                 : /** select socket service implementation.
      54                 : 
      55                 :     Inherits from socket_service to enable runtime polymorphism.
      56                 :     Uses key_type = socket_service for service lookup.
      57                 : */
      58                 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
      59                 : {
      60                 : public:
      61                 :     explicit select_socket_service(capy::execution_context& ctx);
      62                 :     ~select_socket_service() override;
      63                 : 
      64                 :     select_socket_service(select_socket_service const&)            = delete;
      65                 :     select_socket_service& operator=(select_socket_service const&) = delete;
      66                 : 
      67                 :     void shutdown() override;
      68                 : 
      69                 :     io_object::implementation* construct() override;
      70                 :     void destroy(io_object::implementation*) override;
      71                 :     void close(io_object::handle&) override;
      72                 :     std::error_code open_socket(
      73                 :         tcp_socket::implementation& impl,
      74                 :         int family,
      75                 :         int type,
      76                 :         int protocol) override;
      77                 : 
      78 HIT      292886 :     select_scheduler& scheduler() const noexcept
      79                 :     {
      80          292886 :         return state_->sched_;
      81                 :     }
      82                 :     void post(scheduler_op* op);
      83                 :     void work_started() noexcept;
      84                 :     void work_finished() noexcept;
      85                 : 
      86                 : private:
      87                 :     std::unique_ptr<select_socket_state> state_;
      88                 : };
      89                 : 
      90                 : inline void
      91 MIS           0 : select_connect_op::cancel() noexcept
      92                 : {
      93               0 :     if (socket_impl_)
      94               0 :         socket_impl_->cancel_single_op(*this);
      95                 :     else
      96               0 :         request_cancel();
      97               0 : }
      98                 : 
      99                 : inline void
     100 HIT          94 : select_read_op::cancel() noexcept
     101                 : {
     102              94 :     if (socket_impl_)
     103              94 :         socket_impl_->cancel_single_op(*this);
     104                 :     else
     105 MIS           0 :         request_cancel();
     106 HIT          94 : }
     107                 : 
     108                 : inline void
     109 MIS           0 : select_write_op::cancel() noexcept
     110                 : {
     111               0 :     if (socket_impl_)
     112               0 :         socket_impl_->cancel_single_op(*this);
     113                 :     else
     114               0 :         request_cancel();
     115               0 : }
     116                 : 
     117                 : inline void
     118 HIT       50611 : select_op::operator()()
     119                 : {
     120           50611 :     complete_io_op(*this);
     121           50611 : }
     122                 : 
     123                 : inline void
     124            3086 : select_connect_op::operator()()
     125                 : {
     126            3086 :     complete_connect_op(*this);
     127            3086 : }
     128                 : 
     129            9278 : inline select_socket::select_socket(select_socket_service& svc) noexcept
     130            9278 :     : reactor_socket(svc)
     131                 : {
     132            9278 : }
     133                 : 
     134            9278 : inline select_socket::~select_socket() = default;
     135                 : 
     136                 : inline std::coroutine_handle<>
     137            3086 : select_socket::connect(
     138                 :     std::coroutine_handle<> h,
     139                 :     capy::executor_ref ex,
     140                 :     endpoint ep,
     141                 :     std::stop_token token,
     142                 :     std::error_code* ec)
     143                 : {
     144            3086 :     auto result = do_connect(h, ex, ep, token, ec);
     145                 :     // Rebuild fd_sets so select() watches for writability
     146            3086 :     if (result == std::noop_coroutine())
     147            3086 :         svc_.scheduler().notify_reactor();
     148            3086 :     return result;
     149                 : }
     150                 : 
     151                 : inline std::coroutine_handle<>
     152          126277 : select_socket::read_some(
     153                 :     std::coroutine_handle<> h,
     154                 :     capy::executor_ref ex,
     155                 :     buffer_param param,
     156                 :     std::stop_token token,
     157                 :     std::error_code* ec,
     158                 :     std::size_t* bytes_out)
     159                 : {
     160          126277 :     return do_read_some(h, ex, param, token, ec, bytes_out);
     161                 : }
     162                 : 
     163                 : inline std::coroutine_handle<>
     164          126129 : select_socket::write_some(
     165                 :     std::coroutine_handle<> h,
     166                 :     capy::executor_ref ex,
     167                 :     buffer_param param,
     168                 :     std::stop_token token,
     169                 :     std::error_code* ec,
     170                 :     std::size_t* bytes_out)
     171                 : {
     172          126129 :     auto result = do_write_some(h, ex, param, token, ec, bytes_out);
     173                 :     // Rebuild fd_sets so select() watches for writability
     174          126129 :     if (result == std::noop_coroutine())
     175           25218 :         svc_.scheduler().notify_reactor();
     176          126129 :     return result;
     177                 : }
     178                 : 
     179                 : inline void
     180              94 : select_socket::cancel() noexcept
     181                 : {
     182              94 :     do_cancel();
     183              94 : }
     184                 : 
     185                 : inline void
     186           27842 : select_socket::close_socket() noexcept
     187                 : {
     188           27842 :     do_close_socket();
     189           27842 : }
     190                 : 
     191             168 : inline select_socket_service::select_socket_service(
     192             168 :     capy::execution_context& ctx)
     193             168 :     : state_(
     194                 :           std::make_unique<select_socket_state>(
     195             168 :               ctx.use_service<select_scheduler>()))
     196                 : {
     197             168 : }
     198                 : 
     199             336 : inline select_socket_service::~select_socket_service() {}
     200                 : 
     201                 : inline void
     202             168 : select_socket_service::shutdown()
     203                 : {
     204             168 :     std::lock_guard lock(state_->mutex_);
     205                 : 
     206             168 :     while (auto* impl = state_->impl_list_.pop_front())
     207 MIS           0 :         impl->close_socket();
     208                 : 
     209                 :     // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
     210                 :     // drains completed_ops_, calling destroy() on each queued op. Letting
     211                 :     // ~state_ release the ptrs (during service destruction, after scheduler
     212                 :     // shutdown) keeps every impl alive until all ops have been drained.
     213 HIT         168 : }
     214                 : 
     215                 : inline io_object::implementation*
     216            9278 : select_socket_service::construct()
     217                 : {
     218            9278 :     auto impl = std::make_shared<select_socket>(*this);
     219            9278 :     auto* raw = impl.get();
     220                 : 
     221                 :     {
     222            9278 :         std::lock_guard lock(state_->mutex_);
     223            9278 :         state_->impl_ptrs_.emplace(raw, std::move(impl));
     224            9278 :         state_->impl_list_.push_back(raw);
     225            9278 :     }
     226                 : 
     227            9278 :     return raw;
     228            9278 : }
     229                 : 
     230                 : inline void
     231            9278 : select_socket_service::destroy(io_object::implementation* impl)
     232                 : {
     233            9278 :     auto* select_impl = static_cast<select_socket*>(impl);
     234            9278 :     select_impl->close_socket();
     235            9278 :     std::lock_guard lock(state_->mutex_);
     236            9278 :     state_->impl_list_.remove(select_impl);
     237            9278 :     state_->impl_ptrs_.erase(select_impl);
     238            9278 : }
     239                 : 
     240                 : inline std::error_code
     241            3101 : select_socket_service::open_socket(
     242                 :     tcp_socket::implementation& impl, int family, int type, int protocol)
     243                 : {
     244            3101 :     auto* select_impl = static_cast<select_socket*>(&impl);
     245            3101 :     select_impl->close_socket();
     246                 : 
     247            3101 :     int fd = ::socket(family, type, protocol);
     248            3101 :     if (fd < 0)
     249 MIS           0 :         return make_err(errno);
     250                 : 
     251 HIT        3101 :     if (family == AF_INET6)
     252                 :     {
     253               5 :         int one = 1;
     254               5 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
     255                 :     }
     256                 : 
     257            3101 :     int flags = ::fcntl(fd, F_GETFL, 0);
     258            3101 :     if (flags == -1)
     259                 :     {
     260 MIS           0 :         int errn = errno;
     261               0 :         ::close(fd);
     262               0 :         return make_err(errn);
     263                 :     }
     264 HIT        3101 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     265                 :     {
     266 MIS           0 :         int errn = errno;
     267               0 :         ::close(fd);
     268               0 :         return make_err(errn);
     269                 :     }
     270 HIT        3101 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     271                 :     {
     272 MIS           0 :         int errn = errno;
     273               0 :         ::close(fd);
     274               0 :         return make_err(errn);
     275                 :     }
     276                 : 
     277 HIT        3101 :     if (fd >= FD_SETSIZE)
     278                 :     {
     279 MIS           0 :         ::close(fd);
     280               0 :         return make_err(EMFILE);
     281                 :     }
     282                 : 
     283                 : #ifdef SO_NOSIGPIPE
     284                 :     {
     285                 :         int one = 1;
     286                 :         ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
     287                 :     }
     288                 : #endif
     289                 : 
     290 HIT        3101 :     select_impl->fd_ = fd;
     291                 : 
     292            3101 :     select_impl->desc_state_.fd = fd;
     293                 :     {
     294            3101 :         std::lock_guard lock(select_impl->desc_state_.mutex);
     295            3101 :         select_impl->desc_state_.read_op    = nullptr;
     296            3101 :         select_impl->desc_state_.write_op   = nullptr;
     297            3101 :         select_impl->desc_state_.connect_op = nullptr;
     298            3101 :     }
     299            3101 :     scheduler().register_descriptor(fd, &select_impl->desc_state_);
     300                 : 
     301            3101 :     return {};
     302                 : }
     303                 : 
     304                 : inline void
     305           15463 : select_socket_service::close(io_object::handle& h)
     306                 : {
     307           15463 :     static_cast<select_socket*>(h.get())->close_socket();
     308           15463 : }
     309                 : 
     310                 : inline void
     311           50563 : select_socket_service::post(scheduler_op* op)
     312                 : {
     313           50563 :     state_->sched_.post(op);
     314           50563 : }
     315                 : 
     316                 : inline void
     317            3278 : select_socket_service::work_started() noexcept
     318                 : {
     319            3278 :     state_->sched_.work_started();
     320            3278 : }
     321                 : 
     322                 : inline void
     323             144 : select_socket_service::work_finished() noexcept
     324                 : {
     325             144 :     state_->sched_.work_finished();
     326             144 : }
     327                 : 
     328                 : } // namespace boost::corosio::detail
     329                 : 
     330                 : #endif // BOOST_COROSIO_HAS_SELECT
     331                 : 
     332                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3