LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 61.9 % 189 117 72
Test Date: 2026-03-13 23:11:04 Functions: 95.2 % 21 20 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
      25                 : 
      26                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      27                 : 
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <utility>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <sys/select.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : namespace boost::corosio::detail {
      40                 : 
      41                 : /// State for select acceptor service.
      42                 : using select_acceptor_state =
      43                 :     reactor_service_state<select_scheduler, select_acceptor>;
      44                 : 
      45                 : /** select acceptor service implementation.
      46                 : 
      47                 :     Inherits from acceptor_service to enable runtime polymorphism.
      48                 :     Uses key_type = acceptor_service for service lookup.
      49                 : */
      50                 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
      51                 : {
      52                 : public:
      53                 :     explicit select_acceptor_service(capy::execution_context& ctx);
      54                 :     ~select_acceptor_service() override;
      55                 : 
      56                 :     select_acceptor_service(select_acceptor_service const&)            = delete;
      57                 :     select_acceptor_service& operator=(select_acceptor_service const&) = delete;
      58                 : 
      59                 :     void shutdown() override;
      60                 : 
      61                 :     io_object::implementation* construct() override;
      62                 :     void destroy(io_object::implementation*) override;
      63                 :     void close(io_object::handle&) override;
      64                 :     std::error_code open_acceptor_socket(
      65                 :         tcp_acceptor::implementation& impl,
      66                 :         int family,
      67                 :         int type,
      68                 :         int protocol) override;
      69                 :     std::error_code
      70                 :     bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
      71                 :     std::error_code
      72                 :     listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
      73                 : 
      74 HIT         116 :     select_scheduler& scheduler() const noexcept
      75                 :     {
      76             116 :         return state_->sched_;
      77                 :     }
      78                 :     void post(scheduler_op* op);
      79                 :     void work_started() noexcept;
      80                 :     void work_finished() noexcept;
      81                 : 
      82                 :     /** Get the socket service for creating peer sockets during accept. */
      83                 :     select_socket_service* socket_service() const noexcept;
      84                 : 
      85                 : private:
      86                 :     capy::execution_context& ctx_;
      87                 :     std::unique_ptr<select_acceptor_state> state_;
      88                 : };
      89                 : 
      90                 : inline void
      91 MIS           0 : select_accept_op::cancel() noexcept
      92                 : {
      93               0 :     if (acceptor_impl_)
      94               0 :         acceptor_impl_->cancel_single_op(*this);
      95                 :     else
      96               0 :         request_cancel();
      97               0 : }
      98                 : 
      99                 : inline void
     100 HIT        3087 : select_accept_op::operator()()
     101                 : {
     102            3087 :     complete_accept_op<select_socket>(*this);
     103            3087 : }
     104                 : 
     105              61 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
     106              61 :     : reactor_acceptor(svc)
     107                 : {
     108              61 : }
     109                 : 
     110                 : inline std::coroutine_handle<>
     111            3087 : select_acceptor::accept(
     112                 :     std::coroutine_handle<> h,
     113                 :     capy::executor_ref ex,
     114                 :     std::stop_token token,
     115                 :     std::error_code* ec,
     116                 :     io_object::implementation** impl_out)
     117                 : {
     118            3087 :     auto& op = acc_;
     119            3087 :     op.reset();
     120            3087 :     op.h        = h;
     121            3087 :     op.ex       = ex;
     122            3087 :     op.ec_out   = ec;
     123            3087 :     op.impl_out = impl_out;
     124            3087 :     op.fd       = fd_;
     125            3087 :     op.start(token, this);
     126                 : 
     127            3087 :     sockaddr_storage peer_storage{};
     128            3087 :     socklen_t addrlen = sizeof(peer_storage);
     129                 :     int accepted;
     130                 :     do
     131                 :     {
     132                 :         accepted =
     133            3087 :             ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
     134                 :     }
     135            3087 :     while (accepted < 0 && errno == EINTR);
     136                 : 
     137            3087 :     if (accepted >= 0)
     138                 :     {
     139               2 :         if (accepted >= FD_SETSIZE)
     140                 :         {
     141 MIS           0 :             ::close(accepted);
     142               0 :             op.complete(EINVAL, 0);
     143               0 :             op.impl_ptr = shared_from_this();
     144               0 :             svc_.post(&op);
     145               0 :             return std::noop_coroutine();
     146                 :         }
     147                 : 
     148 HIT           2 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     149               2 :         if (flags == -1)
     150                 :         {
     151 MIS           0 :             int err = errno;
     152               0 :             ::close(accepted);
     153               0 :             op.complete(err, 0);
     154               0 :             op.impl_ptr = shared_from_this();
     155               0 :             svc_.post(&op);
     156               0 :             return std::noop_coroutine();
     157                 :         }
     158                 : 
     159 HIT           2 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     160                 :         {
     161 MIS           0 :             int err = errno;
     162               0 :             ::close(accepted);
     163               0 :             op.complete(err, 0);
     164               0 :             op.impl_ptr = shared_from_this();
     165               0 :             svc_.post(&op);
     166               0 :             return std::noop_coroutine();
     167                 :         }
     168                 : 
     169 HIT           2 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     170                 :         {
     171 MIS           0 :             int err = errno;
     172               0 :             ::close(accepted);
     173               0 :             op.complete(err, 0);
     174               0 :             op.impl_ptr = shared_from_this();
     175               0 :             svc_.post(&op);
     176               0 :             return std::noop_coroutine();
     177                 :         }
     178                 : 
     179                 :         {
     180 HIT           2 :             std::lock_guard lock(desc_state_.mutex);
     181               2 :             desc_state_.read_ready = false;
     182               2 :         }
     183                 : 
     184               2 :         if (svc_.scheduler().try_consume_inline_budget())
     185                 :         {
     186 MIS           0 :             auto* socket_svc = svc_.socket_service();
     187               0 :             if (socket_svc)
     188                 :             {
     189                 :                 auto& impl =
     190               0 :                     static_cast<select_socket&>(*socket_svc->construct());
     191               0 :                 impl.set_socket(accepted);
     192                 : 
     193               0 :                 impl.desc_state_.fd = accepted;
     194                 :                 {
     195               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     196               0 :                     impl.desc_state_.read_op    = nullptr;
     197               0 :                     impl.desc_state_.write_op   = nullptr;
     198               0 :                     impl.desc_state_.connect_op = nullptr;
     199               0 :                 }
     200               0 :                 socket_svc->scheduler().register_descriptor(
     201                 :                     accepted, &impl.desc_state_);
     202                 : 
     203               0 :                 impl.set_endpoints(
     204                 :                     local_endpoint_, from_sockaddr(peer_storage));
     205                 : 
     206               0 :                 *ec = {};
     207               0 :                 if (impl_out)
     208               0 :                     *impl_out = &impl;
     209                 :             }
     210                 :             else
     211                 :             {
     212               0 :                 ::close(accepted);
     213               0 :                 *ec = make_err(ENOENT);
     214               0 :                 if (impl_out)
     215               0 :                     *impl_out = nullptr;
     216                 :             }
     217               0 :             return dispatch_coro(ex, h);
     218                 :         }
     219                 : 
     220 HIT           2 :         op.accepted_fd  = accepted;
     221               2 :         op.peer_storage = peer_storage;
     222               2 :         op.complete(0, 0);
     223               2 :         op.impl_ptr = shared_from_this();
     224               2 :         svc_.post(&op);
     225               2 :         return std::noop_coroutine();
     226                 :     }
     227                 : 
     228            3085 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     229                 :     {
     230            3085 :         op.impl_ptr = shared_from_this();
     231            3085 :         svc_.work_started();
     232                 : 
     233            3085 :         std::lock_guard lock(desc_state_.mutex);
     234            3085 :         bool io_done = false;
     235            3085 :         if (desc_state_.read_ready)
     236                 :         {
     237 MIS           0 :             desc_state_.read_ready = false;
     238               0 :             op.perform_io();
     239               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     240               0 :             if (!io_done)
     241               0 :                 op.errn = 0;
     242                 :         }
     243                 : 
     244 HIT        3085 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     245                 :         {
     246 MIS           0 :             svc_.post(&op);
     247               0 :             svc_.work_finished();
     248                 :         }
     249                 :         else
     250                 :         {
     251 HIT        3085 :             desc_state_.read_op = &op;
     252                 :         }
     253            3085 :         return std::noop_coroutine();
     254            3085 :     }
     255                 : 
     256 MIS           0 :     op.complete(errno, 0);
     257               0 :     op.impl_ptr = shared_from_this();
     258               0 :     svc_.post(&op);
     259               0 :     return std::noop_coroutine();
     260                 : }
     261                 : 
     262                 : inline void
     263 HIT           2 : select_acceptor::cancel() noexcept
     264                 : {
     265               2 :     do_cancel();
     266               2 : }
     267                 : 
     268                 : inline void
     269             240 : select_acceptor::close_socket() noexcept
     270                 : {
     271             240 :     do_close_socket();
     272             240 : }
     273                 : 
     274             168 : inline select_acceptor_service::select_acceptor_service(
     275             168 :     capy::execution_context& ctx)
     276             168 :     : ctx_(ctx)
     277             168 :     , state_(
     278                 :           std::make_unique<select_acceptor_state>(
     279             168 :               ctx.use_service<select_scheduler>()))
     280                 : {
     281             168 : }
     282                 : 
     283             336 : inline select_acceptor_service::~select_acceptor_service() {}
     284                 : 
     285                 : inline void
     286             168 : select_acceptor_service::shutdown()
     287                 : {
     288             168 :     std::lock_guard lock(state_->mutex_);
     289                 : 
     290             168 :     while (auto* impl = state_->impl_list_.pop_front())
     291 MIS           0 :         impl->close_socket();
     292                 : 
     293                 :     // Don't clear impl_ptrs_ here — same rationale as
     294                 :     // select_socket_service::shutdown(). Let ~state_ release ptrs
     295                 :     // after scheduler shutdown has drained all queued ops.
     296 HIT         168 : }
     297                 : 
     298                 : inline io_object::implementation*
     299              61 : select_acceptor_service::construct()
     300                 : {
     301              61 :     auto impl = std::make_shared<select_acceptor>(*this);
     302              61 :     auto* raw = impl.get();
     303                 : 
     304              61 :     std::lock_guard lock(state_->mutex_);
     305              61 :     state_->impl_ptrs_.emplace(raw, std::move(impl));
     306              61 :     state_->impl_list_.push_back(raw);
     307                 : 
     308              61 :     return raw;
     309              61 : }
     310                 : 
     311                 : inline void
     312              61 : select_acceptor_service::destroy(io_object::implementation* impl)
     313                 : {
     314              61 :     auto* select_impl = static_cast<select_acceptor*>(impl);
     315              61 :     select_impl->close_socket();
     316              61 :     std::lock_guard lock(state_->mutex_);
     317              61 :     state_->impl_list_.remove(select_impl);
     318              61 :     state_->impl_ptrs_.erase(select_impl);
     319              61 : }
     320                 : 
     321                 : inline void
     322             120 : select_acceptor_service::close(io_object::handle& h)
     323                 : {
     324             120 :     static_cast<select_acceptor*>(h.get())->close_socket();
     325             120 : }
     326                 : 
     327                 : inline std::error_code
     328              59 : select_acceptor_service::open_acceptor_socket(
     329                 :     tcp_acceptor::implementation& impl, int family, int type, int protocol)
     330                 : {
     331              59 :     auto* select_impl = static_cast<select_acceptor*>(&impl);
     332              59 :     select_impl->close_socket();
     333                 : 
     334              59 :     int fd = ::socket(family, type, protocol);
     335              59 :     if (fd < 0)
     336 MIS           0 :         return make_err(errno);
     337                 : 
     338 HIT          59 :     int flags = ::fcntl(fd, F_GETFL, 0);
     339              59 :     if (flags == -1)
     340                 :     {
     341 MIS           0 :         int errn = errno;
     342               0 :         ::close(fd);
     343               0 :         return make_err(errn);
     344                 :     }
     345 HIT          59 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     346                 :     {
     347 MIS           0 :         int errn = errno;
     348               0 :         ::close(fd);
     349               0 :         return make_err(errn);
     350                 :     }
     351 HIT          59 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     352                 :     {
     353 MIS           0 :         int errn = errno;
     354               0 :         ::close(fd);
     355               0 :         return make_err(errn);
     356                 :     }
     357                 : 
     358 HIT          59 :     if (fd >= FD_SETSIZE)
     359                 :     {
     360 MIS           0 :         ::close(fd);
     361               0 :         return make_err(EMFILE);
     362                 :     }
     363                 : 
     364 HIT          59 :     if (family == AF_INET6)
     365                 :     {
     366               8 :         int val = 0; // dual-stack default
     367               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     368                 :     }
     369                 : 
     370                 : #ifdef SO_NOSIGPIPE
     371                 :     {
     372                 :         int nosig = 1;
     373                 :         ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
     374                 :     }
     375                 : #endif
     376                 : 
     377              59 :     select_impl->fd_ = fd;
     378                 : 
     379                 :     // Set up descriptor state but do NOT register with reactor yet
     380                 :     // (registration happens in do_listen via reactor_acceptor base)
     381              59 :     select_impl->desc_state_.fd = fd;
     382                 :     {
     383              59 :         std::lock_guard lock(select_impl->desc_state_.mutex);
     384              59 :         select_impl->desc_state_.read_op = nullptr;
     385              59 :     }
     386                 : 
     387              59 :     return {};
     388                 : }
     389                 : 
     390                 : inline std::error_code
     391              58 : select_acceptor_service::bind_acceptor(
     392                 :     tcp_acceptor::implementation& impl, endpoint ep)
     393                 : {
     394              58 :     return static_cast<select_acceptor*>(&impl)->do_bind(ep);
     395                 : }
     396                 : 
     397                 : inline std::error_code
     398              57 : select_acceptor_service::listen_acceptor(
     399                 :     tcp_acceptor::implementation& impl, int backlog)
     400                 : {
     401              57 :     return static_cast<select_acceptor*>(&impl)->do_listen(backlog);
     402                 : }
     403                 : 
     404                 : inline void
     405               5 : select_acceptor_service::post(scheduler_op* op)
     406                 : {
     407               5 :     state_->sched_.post(op);
     408               5 : }
     409                 : 
     410                 : inline void
     411            3085 : select_acceptor_service::work_started() noexcept
     412                 : {
     413            3085 :     state_->sched_.work_started();
     414            3085 : }
     415                 : 
     416                 : inline void
     417               3 : select_acceptor_service::work_finished() noexcept
     418                 : {
     419               3 :     state_->sched_.work_finished();
     420               3 : }
     421                 : 
     422                 : inline select_socket_service*
     423            3084 : select_acceptor_service::socket_service() const noexcept
     424                 : {
     425            3084 :     auto* svc = ctx_.find_service<detail::socket_service>();
     426            3084 :     return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
     427                 : }
     428                 : 
     429                 : } // namespace boost::corosio::detail
     430                 : 
     431                 : #endif // BOOST_COROSIO_HAS_SELECT
     432                 : 
     433                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3