LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 85.7 % 84 72 12
Test Date: 2026-03-13 23:11:04 Functions: 87.5 % 32 28 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/io/io_object.hpp>
      15                 : #include <boost/corosio/endpoint.hpp>
      16                 : #include <boost/capy/ex/executor_ref.hpp>
      17                 : 
      18                 : #include <atomic>
      19                 : #include <coroutine>
      20                 : #include <cstddef>
      21                 : #include <memory>
      22                 : #include <optional>
      23                 : #include <stop_token>
      24                 : #include <system_error>
      25                 : 
      26                 : #include <errno.h>
      27                 : 
      28                 : #include <netinet/in.h>
      29                 : #include <sys/socket.h>
      30                 : #include <sys/uio.h>
      31                 : 
      32                 : namespace boost::corosio::detail {
      33                 : 
      34                 : /** Base operation for reactor-based backends.
      35                 : 
      36                 :     Holds per-operation state that depends on the concrete backend
      37                 :     socket/acceptor types: coroutine handle, executor, output
      38                 :     pointers, file descriptor, stop_callback, and type-specific
      39                 :     impl pointers.
      40                 : 
      41                 :     Fields shared across all backends (errn, bytes_transferred,
      42                 :     cancelled, impl_ptr, perform_io, complete) live in
      43                 :     reactor_op_base so the scheduler and descriptor_state can
      44                 :     access them without template instantiation.
      45                 : 
      46                 :     @tparam Socket The backend socket impl type (forward-declared).
      47                 :     @tparam Acceptor The backend acceptor impl type (forward-declared).
      48                 : */
      49                 : template<class Socket, class Acceptor>
      50                 : struct reactor_op : reactor_op_base
      51                 : {
      52                 :     /// Stop-token callback that invokes cancel() on the target op.
      53                 :     struct canceller
      54                 :     {
      55                 :         reactor_op* op;
      56 HIT         199 :         void operator()() const noexcept
      57                 :         {
      58             199 :             op->cancel();
      59             199 :         }
      60                 :     };
      61                 : 
      62                 :     /// Caller's coroutine handle to resume on completion.
      63                 :     std::coroutine_handle<> h;
      64                 : 
      65                 :     /// Executor for dispatching the completion.
      66                 :     capy::executor_ref ex;
      67                 : 
      68                 :     /// Output pointer for the error code.
      69                 :     std::error_code* ec_out = nullptr;
      70                 : 
      71                 :     /// Output pointer for bytes transferred.
      72                 :     std::size_t* bytes_out = nullptr;
      73                 : 
      74                 :     /// File descriptor this operation targets.
      75                 :     int fd = -1;
      76                 : 
      77                 :     /// Stop-token callback registration.
      78                 :     std::optional<std::stop_callback<canceller>> stop_cb;
      79                 : 
      80                 :     /// Owning socket impl (for stop_token cancellation).
      81                 :     Socket* socket_impl_ = nullptr;
      82                 : 
      83                 :     /// Owning acceptor impl (for stop_token cancellation).
      84                 :     Acceptor* acceptor_impl_ = nullptr;
      85                 : 
      86           67737 :     reactor_op() = default;
      87                 : 
      88                 :     /// Reset operation state for reuse.
      89          522035 :     void reset() noexcept
      90                 :     {
      91          522035 :         fd                = -1;
      92          522035 :         errn              = 0;
      93          522035 :         bytes_transferred = 0;
      94          522035 :         cancelled.store(false, std::memory_order_relaxed);
      95          522035 :         impl_ptr.reset();
      96          522035 :         socket_impl_   = nullptr;
      97          522035 :         acceptor_impl_ = nullptr;
      98          522035 :     }
      99                 : 
     100                 :     /// Return true if this is a read-direction operation.
     101           50651 :     virtual bool is_read_operation() const noexcept
     102                 :     {
     103           50651 :         return false;
     104                 :     }
     105                 : 
     106                 :     /// Cancel this operation via the owning impl.
     107                 :     virtual void cancel() noexcept = 0;
     108                 : 
     109                 :     /// Destroy without invoking.
     110 MIS           0 :     void destroy() override
     111                 :     {
     112               0 :         stop_cb.reset();
     113               0 :         reactor_op_base::destroy();
     114               0 :     }
     115                 : 
     116                 :     /// Arm the stop-token callback for a socket operation.
     117 HIT      109161 :     void start(std::stop_token const& token, Socket* impl)
     118                 :     {
     119          109161 :         cancelled.store(false, std::memory_order_release);
     120          109161 :         stop_cb.reset();
     121          109161 :         socket_impl_   = impl;
     122          109161 :         acceptor_impl_ = nullptr;
     123                 : 
     124          109161 :         if (token.stop_possible())
     125             195 :             stop_cb.emplace(token, canceller{this});
     126          109161 :     }
     127                 : 
     128                 :     /// Arm the stop-token callback for an acceptor operation.
     129            7493 :     void start(std::stop_token const& token, Acceptor* impl)
     130                 :     {
     131            7493 :         cancelled.store(false, std::memory_order_release);
     132            7493 :         stop_cb.reset();
     133            7493 :         socket_impl_   = nullptr;
     134            7493 :         acceptor_impl_ = impl;
     135                 : 
     136            7493 :         if (token.stop_possible())
     137               9 :             stop_cb.emplace(token, canceller{this});
     138            7493 :     }
     139                 : };
     140                 : 
     141                 : /** Shared connect operation.
     142                 : 
     143                 :     Checks SO_ERROR for connect completion status. The operator()()
     144                 :     and cancel() are provided by the concrete backend type.
     145                 : 
     146                 :     @tparam Base The backend's base op type.
     147                 : */
     148                 : template<class Base>
     149                 : struct reactor_connect_op : Base
     150                 : {
     151                 :     /// Endpoint to connect to.
     152                 :     endpoint target_endpoint;
     153                 : 
     154                 :     /// Reset operation state for reuse.
     155            7485 :     void reset() noexcept
     156                 :     {
     157            7485 :         Base::reset();
     158            7485 :         target_endpoint = endpoint{};
     159            7485 :     }
     160                 : 
     161            7484 :     void perform_io() noexcept override
     162                 :     {
     163            7484 :         int err       = 0;
     164            7484 :         socklen_t len = sizeof(err);
     165            7484 :         if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     166 MIS           0 :             err = errno;
     167 HIT        7484 :         this->complete(err, 0);
     168            7484 :     }
     169                 : };
     170                 : 
     171                 : /** Shared scatter-read operation.
     172                 : 
     173                 :     Uses readv() with an EINTR retry loop.
     174                 : 
     175                 :     @tparam Base The backend's base op type.
     176                 : */
     177                 : template<class Base>
     178                 : struct reactor_read_op : Base
     179                 : {
     180                 :     /// Maximum scatter-gather buffer count.
     181                 :     static constexpr std::size_t max_buffers = 16;
     182                 : 
     183                 :     /// Scatter-gather I/O vectors.
     184                 :     iovec iovecs[max_buffers];
     185                 : 
     186                 :     /// Number of active I/O vectors.
     187                 :     int iovec_count = 0;
     188                 : 
     189                 :     /// True for zero-length reads (completed immediately).
     190                 :     bool empty_buffer_read = false;
     191                 : 
     192                 :     /// Return true (this is a read-direction operation).
     193           50715 :     bool is_read_operation() const noexcept override
     194                 :     {
     195           50715 :         return !empty_buffer_read;
     196                 :     }
     197                 : 
     198          253679 :     void reset() noexcept
     199                 :     {
     200          253679 :         Base::reset();
     201          253679 :         iovec_count       = 0;
     202          253679 :         empty_buffer_read = false;
     203          253679 :     }
     204                 : 
     205             333 :     void perform_io() noexcept override
     206                 :     {
     207                 :         ssize_t n;
     208                 :         do
     209                 :         {
     210             333 :             n = ::readv(this->fd, iovecs, iovec_count);
     211                 :         }
     212             333 :         while (n < 0 && errno == EINTR);
     213                 : 
     214             333 :         if (n >= 0)
     215             100 :             this->complete(0, static_cast<std::size_t>(n));
     216                 :         else
     217             233 :             this->complete(errno, 0);
     218             333 :     }
     219                 : };
     220                 : 
     221                 : /** Shared gather-write operation.
     222                 : 
     223                 :     Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
     224                 :     which returns ssize_t (bytes written or -1 with errno set).
     225                 : 
     226                 :     @tparam Base The backend's base op type.
     227                 :     @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
     228                 : */
     229                 : template<class Base, class WritePolicy>
     230                 : struct reactor_write_op : Base
     231                 : {
     232                 :     /// The write syscall policy type.
     233                 :     using write_policy = WritePolicy;
     234                 : 
     235                 :     /// Maximum scatter-gather buffer count.
     236                 :     static constexpr std::size_t max_buffers = 16;
     237                 : 
     238                 :     /// Scatter-gather I/O vectors.
     239                 :     iovec iovecs[max_buffers];
     240                 : 
     241                 :     /// Number of active I/O vectors.
     242                 :     int iovec_count = 0;
     243                 : 
     244          253378 :     void reset() noexcept
     245                 :     {
     246          253378 :         Base::reset();
     247          253378 :         iovec_count = 0;
     248          253378 :     }
     249                 : 
     250 MIS           0 :     void perform_io() noexcept override
     251                 :     {
     252               0 :         ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
     253               0 :         if (n >= 0)
     254               0 :             this->complete(0, static_cast<std::size_t>(n));
     255                 :         else
     256               0 :             this->complete(errno, 0);
     257               0 :     }
     258                 : };
     259                 : 
     260                 : /** Shared accept operation.
     261                 : 
     262                 :     Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
     263                 :     which returns the accepted fd or -1 with errno set.
     264                 : 
     265                 :     @tparam Base The backend's base op type.
     266                 :     @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
     267                 : */
     268                 : template<class Base, class AcceptPolicy>
     269                 : struct reactor_accept_op : Base
     270                 : {
     271                 :     /// File descriptor of the accepted connection.
     272                 :     int accepted_fd = -1;
     273                 : 
     274                 :     /// Pointer to the peer socket implementation.
     275                 :     io_object::implementation* peer_impl = nullptr;
     276                 : 
     277                 :     /// Output pointer for the accepted implementation.
     278                 :     io_object::implementation** impl_out = nullptr;
     279                 : 
     280                 :     /// Peer address storage filled by accept.
     281                 :     sockaddr_storage peer_storage{};
     282                 : 
     283 HIT        7493 :     void reset() noexcept
     284                 :     {
     285            7493 :         Base::reset();
     286            7493 :         accepted_fd  = -1;
     287            7493 :         peer_impl    = nullptr;
     288            7493 :         impl_out     = nullptr;
     289            7493 :         peer_storage = {};
     290            7493 :     }
     291                 : 
     292            7477 :     void perform_io() noexcept override
     293                 :     {
     294            7477 :         int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
     295            7477 :         if (new_fd >= 0)
     296                 :         {
     297            7477 :             accepted_fd = new_fd;
     298            7477 :             this->complete(0, 0);
     299                 :         }
     300                 :         else
     301                 :         {
     302 MIS           0 :             this->complete(errno, 0);
     303                 :         }
     304 HIT        7477 :     }
     305                 : };
     306                 : 
     307                 : } // namespace boost::corosio::detail
     308                 : 
     309                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
        

Generated by: LCOV version 2.3