LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 76.6 % 145 111 34
Test Date: 2026-03-13 23:11:04 Functions: 100.0 % 21 21

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      24                 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
      25                 : 
      26                 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
      27                 : 
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <utility>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <netinet/in.h>
      34                 : #include <sys/epoll.h>
      35                 : #include <sys/socket.h>
      36                 : #include <unistd.h>
      37                 : 
      38                 : namespace boost::corosio::detail {
      39                 : 
      40                 : /// State for epoll acceptor service.
      41                 : using epoll_acceptor_state =
      42                 :     reactor_service_state<epoll_scheduler, epoll_acceptor>;
      43                 : 
      44                 : /** epoll acceptor service implementation.
      45                 : 
      46                 :     Inherits from acceptor_service to enable runtime polymorphism.
      47                 :     Uses key_type = acceptor_service for service lookup.
      48                 : */
      49                 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
      50                 : {
      51                 : public:
      52                 :     explicit epoll_acceptor_service(capy::execution_context& ctx);
      53                 :     ~epoll_acceptor_service() override;
      54                 : 
      55                 :     epoll_acceptor_service(epoll_acceptor_service const&)            = delete;
      56                 :     epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
      57                 : 
      58                 :     void shutdown() override;
      59                 : 
      60                 :     io_object::implementation* construct() override;
      61                 :     void destroy(io_object::implementation*) override;
      62                 :     void close(io_object::handle&) override;
      63                 :     std::error_code open_acceptor_socket(
      64                 :         tcp_acceptor::implementation& impl,
      65                 :         int family,
      66                 :         int type,
      67                 :         int protocol) override;
      68                 :     std::error_code
      69                 :     bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
      70                 :     std::error_code
      71                 :     listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
      72                 : 
      73 HIT         152 :     epoll_scheduler& scheduler() const noexcept
      74                 :     {
      75             152 :         return state_->sched_;
      76                 :     }
      77                 :     void post(scheduler_op* op);
      78                 :     void work_started() noexcept;
      79                 :     void work_finished() noexcept;
      80                 : 
      81                 :     /** Get the socket service for creating peer sockets during accept. */
      82                 :     epoll_socket_service* socket_service() const noexcept;
      83                 : 
      84                 : private:
      85                 :     capy::execution_context& ctx_;
      86                 :     std::unique_ptr<epoll_acceptor_state> state_;
      87                 : };
      88                 : 
      89                 : inline void
      90               6 : epoll_accept_op::cancel() noexcept
      91                 : {
      92               6 :     if (acceptor_impl_)
      93               6 :         acceptor_impl_->cancel_single_op(*this);
      94                 :     else
      95 MIS           0 :         request_cancel();
      96 HIT           6 : }
      97                 : 
      98                 : inline void
      99            4406 : epoll_accept_op::operator()()
     100                 : {
     101            4406 :     complete_accept_op<epoll_socket>(*this);
     102            4406 : }
     103                 : 
     104              80 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
     105              80 :     : reactor_acceptor(svc)
     106                 : {
     107              80 : }
     108                 : 
     109                 : inline std::coroutine_handle<>
     110            4406 : epoll_acceptor::accept(
     111                 :     std::coroutine_handle<> h,
     112                 :     capy::executor_ref ex,
     113                 :     std::stop_token token,
     114                 :     std::error_code* ec,
     115                 :     io_object::implementation** impl_out)
     116                 : {
     117            4406 :     auto& op = acc_;
     118            4406 :     op.reset();
     119            4406 :     op.h        = h;
     120            4406 :     op.ex       = ex;
     121            4406 :     op.ec_out   = ec;
     122            4406 :     op.impl_out = impl_out;
     123            4406 :     op.fd       = fd_;
     124            4406 :     op.start(token, this);
     125                 : 
     126            4406 :     sockaddr_storage peer_storage{};
     127            4406 :     socklen_t addrlen = sizeof(peer_storage);
     128                 :     int accepted;
     129                 :     do
     130                 :     {
     131            4406 :         accepted = ::accept4(
     132                 :             fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     133                 :             SOCK_NONBLOCK | SOCK_CLOEXEC);
     134                 :     }
     135            4406 :     while (accepted < 0 && errno == EINTR);
     136                 : 
     137            4406 :     if (accepted >= 0)
     138                 :     {
     139                 :         {
     140               2 :             std::lock_guard lock(desc_state_.mutex);
     141               2 :             desc_state_.read_ready = false;
     142               2 :         }
     143                 : 
     144               2 :         if (svc_.scheduler().try_consume_inline_budget())
     145                 :         {
     146 MIS           0 :             auto* socket_svc = svc_.socket_service();
     147               0 :             if (socket_svc)
     148                 :             {
     149                 :                 auto& impl =
     150               0 :                     static_cast<epoll_socket&>(*socket_svc->construct());
     151               0 :                 impl.set_socket(accepted);
     152                 : 
     153               0 :                 impl.desc_state_.fd = accepted;
     154                 :                 {
     155               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     156               0 :                     impl.desc_state_.read_op    = nullptr;
     157               0 :                     impl.desc_state_.write_op   = nullptr;
     158               0 :                     impl.desc_state_.connect_op = nullptr;
     159               0 :                 }
     160               0 :                 socket_svc->scheduler().register_descriptor(
     161                 :                     accepted, &impl.desc_state_);
     162                 : 
     163               0 :                 impl.set_endpoints(
     164                 :                     local_endpoint_, from_sockaddr(peer_storage));
     165                 : 
     166               0 :                 *ec = {};
     167               0 :                 if (impl_out)
     168               0 :                     *impl_out = &impl;
     169                 :             }
     170                 :             else
     171                 :             {
     172               0 :                 ::close(accepted);
     173               0 :                 *ec = make_err(ENOENT);
     174               0 :                 if (impl_out)
     175               0 :                     *impl_out = nullptr;
     176                 :             }
     177               0 :             return dispatch_coro(ex, h);
     178                 :         }
     179                 : 
     180 HIT           2 :         op.accepted_fd  = accepted;
     181               2 :         op.peer_storage = peer_storage;
     182               2 :         op.complete(0, 0);
     183               2 :         op.impl_ptr = shared_from_this();
     184               2 :         svc_.post(&op);
     185               2 :         return std::noop_coroutine();
     186                 :     }
     187                 : 
     188            4404 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     189                 :     {
     190            4404 :         op.impl_ptr = shared_from_this();
     191            4404 :         svc_.work_started();
     192                 : 
     193            4404 :         std::lock_guard lock(desc_state_.mutex);
     194            4404 :         bool io_done = false;
     195            4404 :         if (desc_state_.read_ready)
     196                 :         {
     197 MIS           0 :             desc_state_.read_ready = false;
     198               0 :             op.perform_io();
     199               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     200               0 :             if (!io_done)
     201               0 :                 op.errn = 0;
     202                 :         }
     203                 : 
     204 HIT        4404 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     205                 :         {
     206 MIS           0 :             svc_.post(&op);
     207               0 :             svc_.work_finished();
     208                 :         }
     209                 :         else
     210                 :         {
     211 HIT        4404 :             desc_state_.read_op = &op;
     212                 :         }
     213            4404 :         return std::noop_coroutine();
     214            4404 :     }
     215                 : 
     216 MIS           0 :     op.complete(errno, 0);
     217               0 :     op.impl_ptr = shared_from_this();
     218               0 :     svc_.post(&op);
     219                 :     // completion is always posted to scheduler queue, never inline.
     220               0 :     return std::noop_coroutine();
     221                 : }
     222                 : 
     223                 : inline void
     224 HIT           2 : epoll_acceptor::cancel() noexcept
     225                 : {
     226               2 :     do_cancel();
     227               2 : }
     228                 : 
     229                 : inline void
     230             318 : epoll_acceptor::close_socket() noexcept
     231                 : {
     232             318 :     do_close_socket();
     233             318 : }
     234                 : 
     235             244 : inline epoll_acceptor_service::epoll_acceptor_service(
     236             244 :     capy::execution_context& ctx)
     237             244 :     : ctx_(ctx)
     238             244 :     , state_(
     239                 :           std::make_unique<epoll_acceptor_state>(
     240             244 :               ctx.use_service<epoll_scheduler>()))
     241                 : {
     242             244 : }
     243                 : 
     244             488 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
     245                 : 
     246                 : inline void
     247             244 : epoll_acceptor_service::shutdown()
     248                 : {
     249             244 :     std::lock_guard lock(state_->mutex_);
     250                 : 
     251             244 :     while (auto* impl = state_->impl_list_.pop_front())
     252 MIS           0 :         impl->close_socket();
     253                 : 
     254                 :     // Don't clear impl_ptrs_ here — same rationale as
     255                 :     // epoll_socket_service::shutdown(). Let ~state_ release ptrs
     256                 :     // after scheduler shutdown has drained all queued ops.
     257 HIT         244 : }
     258                 : 
     259                 : inline io_object::implementation*
     260              80 : epoll_acceptor_service::construct()
     261                 : {
     262              80 :     auto impl = std::make_shared<epoll_acceptor>(*this);
     263              80 :     auto* raw = impl.get();
     264                 : 
     265              80 :     std::lock_guard lock(state_->mutex_);
     266              80 :     state_->impl_ptrs_.emplace(raw, std::move(impl));
     267              80 :     state_->impl_list_.push_back(raw);
     268                 : 
     269              80 :     return raw;
     270              80 : }
     271                 : 
     272                 : inline void
     273              80 : epoll_acceptor_service::destroy(io_object::implementation* impl)
     274                 : {
     275              80 :     auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
     276              80 :     epoll_impl->close_socket();
     277              80 :     std::lock_guard lock(state_->mutex_);
     278              80 :     state_->impl_list_.remove(epoll_impl);
     279              80 :     state_->impl_ptrs_.erase(epoll_impl);
     280              80 : }
     281                 : 
     282                 : inline void
     283             159 : epoll_acceptor_service::close(io_object::handle& h)
     284                 : {
     285             159 :     static_cast<epoll_acceptor*>(h.get())->close_socket();
     286             159 : }
     287                 : 
     288                 : inline std::error_code
     289              79 : epoll_acceptor_service::open_acceptor_socket(
     290                 :     tcp_acceptor::implementation& impl, int family, int type, int protocol)
     291                 : {
     292              79 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     293              79 :     epoll_impl->close_socket();
     294                 : 
     295              79 :     int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
     296              79 :     if (fd < 0)
     297 MIS           0 :         return make_err(errno);
     298                 : 
     299 HIT          79 :     if (family == AF_INET6)
     300                 :     {
     301               8 :         int val = 0; // dual-stack default
     302               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     303                 :     }
     304                 : 
     305              79 :     epoll_impl->fd_ = fd;
     306                 : 
     307                 :     // Set up descriptor state but do NOT register with epoll yet
     308              79 :     epoll_impl->desc_state_.fd = fd;
     309                 :     {
     310              79 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     311              79 :         epoll_impl->desc_state_.read_op = nullptr;
     312              79 :     }
     313                 : 
     314              79 :     return {};
     315                 : }
     316                 : 
     317                 : inline std::error_code
     318              78 : epoll_acceptor_service::bind_acceptor(
     319                 :     tcp_acceptor::implementation& impl, endpoint ep)
     320                 : {
     321              78 :     return static_cast<epoll_acceptor*>(&impl)->do_bind(ep);
     322                 : }
     323                 : 
     324                 : inline std::error_code
     325              75 : epoll_acceptor_service::listen_acceptor(
     326                 :     tcp_acceptor::implementation& impl, int backlog)
     327                 : {
     328              75 :     return static_cast<epoll_acceptor*>(&impl)->do_listen(backlog);
     329                 : }
     330                 : 
     331                 : inline void
     332              11 : epoll_acceptor_service::post(scheduler_op* op)
     333                 : {
     334              11 :     state_->sched_.post(op);
     335              11 : }
     336                 : 
     337                 : inline void
     338            4404 : epoll_acceptor_service::work_started() noexcept
     339                 : {
     340            4404 :     state_->sched_.work_started();
     341            4404 : }
     342                 : 
     343                 : inline void
     344               9 : epoll_acceptor_service::work_finished() noexcept
     345                 : {
     346               9 :     state_->sched_.work_finished();
     347               9 : }
     348                 : 
     349                 : inline epoll_socket_service*
     350            4397 : epoll_acceptor_service::socket_service() const noexcept
     351                 : {
     352            4397 :     auto* svc = ctx_.find_service<detail::socket_service>();
     353            4397 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     354                 : }
     355                 : 
     356                 : } // namespace boost::corosio::detail
     357                 : 
     358                 : #endif // BOOST_COROSIO_HAS_EPOLL
     359                 : 
     360                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3