LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_op_complete.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 89.3 % 84 75 9
Test Date: 2026-03-13 23:11:04 Functions: 100.0 % 8 8

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      14                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      15                 : #include <boost/corosio/native/detail/make_err.hpp>
      16                 : #include <boost/corosio/io/io_object.hpp>
      17                 : 
      18                 : #include <coroutine>
      19                 : #include <mutex>
      20                 : #include <utility>
      21                 : 
      22                 : #include <netinet/in.h>
      23                 : #include <sys/socket.h>
      24                 : #include <unistd.h>
      25                 : 
      26                 : namespace boost::corosio::detail {
      27                 : 
      28                 : /** Complete a base read/write operation.
      29                 : 
      30                 :     Translates the recorded errno and cancellation state into
      31                 :     an error_code, stores the byte count, then resumes the
      32                 :     caller via symmetric transfer.
      33                 : 
      34                 :     @tparam Op The concrete operation type.
      35                 :     @param op The operation to complete.
      36                 : */
      37                 : template <typename Op>
      38                 : void
      39 HIT      101676 : complete_io_op(Op& op)
      40                 : {
      41          101676 :     op.stop_cb.reset();
      42          101676 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      43                 : 
      44          101676 :     if (op.cancelled.load(std::memory_order_acquire))
      45             310 :         *op.ec_out = capy::error::canceled;
      46          101366 :     else if (op.errn != 0)
      47 MIS           0 :         *op.ec_out = make_err(op.errn);
      48 HIT      101366 :     else if (op.is_read_operation() && op.bytes_transferred == 0)
      49 MIS           0 :         *op.ec_out = capy::error::eof;
      50                 :     else
      51 HIT      101366 :         *op.ec_out = {};
      52                 : 
      53          101676 :     *op.bytes_out = op.bytes_transferred;
      54                 : 
      55          101676 :     capy::executor_ref saved_ex(op.ex);
      56          101676 :     std::coroutine_handle<> saved_h(op.h);
      57          101676 :     auto prevent = std::move(op.impl_ptr);
      58          101676 :     dispatch_coro(saved_ex, saved_h).resume();
      59          101676 : }
      60                 : 
      61                 : /** Complete a connect operation with endpoint caching.
      62                 : 
      63                 :     On success, queries the local endpoint via getsockname and
      64                 :     caches both endpoints in the socket impl. Then resumes the
      65                 :     caller via symmetric transfer.
      66                 : 
      67                 :     @tparam Op The concrete connect operation type.
      68                 :     @param op The operation to complete.
      69                 : */
      70                 : template <typename Op>
      71                 : void
      72            7485 : complete_connect_op(Op& op)
      73                 : {
      74            7485 :     op.stop_cb.reset();
      75            7485 :     op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
      76                 : 
      77            7485 :     bool success =
      78            7485 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
      79                 : 
      80            7485 :     if (success && op.socket_impl_)
      81                 :     {
      82            7481 :         endpoint local_ep;
      83            7481 :         sockaddr_storage local_storage{};
      84            7481 :         socklen_t local_len = sizeof(local_storage);
      85            7481 :         if (::getsockname(
      86                 :                 op.fd,
      87                 :                 reinterpret_cast<sockaddr*>(&local_storage),
      88            7481 :                 &local_len) == 0)
      89            7481 :             local_ep = from_sockaddr(local_storage);
      90            7481 :         op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
      91                 :     }
      92                 : 
      93            7485 :     if (op.cancelled.load(std::memory_order_acquire))
      94 MIS           0 :         *op.ec_out = capy::error::canceled;
      95 HIT        7485 :     else if (op.errn != 0)
      96               4 :         *op.ec_out = make_err(op.errn);
      97                 :     else
      98            7481 :         *op.ec_out = {};
      99                 : 
     100            7485 :     capy::executor_ref saved_ex(op.ex);
     101            7485 :     std::coroutine_handle<> saved_h(op.h);
     102            7485 :     auto prevent = std::move(op.impl_ptr);
     103            7485 :     dispatch_coro(saved_ex, saved_h).resume();
     104            7485 : }
     105                 : 
     106                 : /** Construct and register a peer socket from an accepted fd.
     107                 : 
     108                 :     Creates a new socket impl via the acceptor's associated
     109                 :     socket service, registers it with the scheduler, and caches
     110                 :     the local and remote endpoints.
     111                 : 
     112                 :     @tparam SocketImpl The concrete socket implementation type.
     113                 :     @tparam AcceptorImpl The concrete acceptor implementation type.
     114                 :     @param acceptor_impl The acceptor that accepted the connection.
     115                 :     @param accepted_fd The accepted file descriptor (set to -1 on success).
     116                 :     @param peer_storage The peer address from accept().
     117                 :     @param impl_out Output pointer for the new socket impl.
     118                 :     @param ec_out Output pointer for any error.
     119                 :     @return True on success, false on failure.
     120                 : */
     121                 : template <typename SocketImpl, typename AcceptorImpl>
     122                 : bool
     123            7481 : setup_accepted_socket(
     124                 :     AcceptorImpl* acceptor_impl,
     125                 :     int& accepted_fd,
     126                 :     sockaddr_storage const& peer_storage,
     127                 :     io_object::implementation** impl_out,
     128                 :     std::error_code* ec_out)
     129                 : {
     130            7481 :     auto* socket_svc = acceptor_impl->service().socket_service();
     131            7481 :     if (!socket_svc)
     132                 :     {
     133 MIS           0 :         *ec_out = make_err(ENOENT);
     134               0 :         return false;
     135                 :     }
     136                 : 
     137 HIT        7481 :     auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
     138            7481 :     impl.set_socket(accepted_fd);
     139                 : 
     140            7481 :     impl.desc_state_.fd = accepted_fd;
     141                 :     {
     142            7481 :         std::lock_guard lock(impl.desc_state_.mutex);
     143            7481 :         impl.desc_state_.read_op    = nullptr;
     144            7481 :         impl.desc_state_.write_op   = nullptr;
     145            7481 :         impl.desc_state_.connect_op = nullptr;
     146            7481 :     }
     147            7481 :     socket_svc->scheduler().register_descriptor(
     148                 :         accepted_fd, &impl.desc_state_);
     149                 : 
     150            7481 :     impl.set_endpoints(
     151                 :         acceptor_impl->local_endpoint(),
     152                 :         from_sockaddr(peer_storage));
     153                 : 
     154            7481 :     if (impl_out)
     155            7481 :         *impl_out = &impl;
     156            7481 :     accepted_fd = -1;
     157            7481 :     return true;
     158                 : }
     159                 : 
     160                 : /** Complete an accept operation.
     161                 : 
     162                 :     Sets up the peer socket on success, or closes the accepted
     163                 :     fd on failure. Then resumes the caller via symmetric transfer.
     164                 : 
     165                 :     @tparam SocketImpl The concrete socket implementation type.
     166                 :     @tparam Op The concrete accept operation type.
     167                 :     @param op The operation to complete.
     168                 : */
     169                 : template <typename SocketImpl, typename Op>
     170                 : void
     171            7493 : complete_accept_op(Op& op)
     172                 : {
     173            7493 :     op.stop_cb.reset();
     174            7493 :     op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
     175                 : 
     176            7493 :     bool success =
     177            7493 :         (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
     178                 : 
     179            7493 :     if (op.cancelled.load(std::memory_order_acquire))
     180              12 :         *op.ec_out = capy::error::canceled;
     181            7481 :     else if (op.errn != 0)
     182 MIS           0 :         *op.ec_out = make_err(op.errn);
     183                 :     else
     184 HIT        7481 :         *op.ec_out = {};
     185                 : 
     186            7493 :     if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
     187                 :     {
     188            7481 :         if (!setup_accepted_socket<SocketImpl>(
     189                 :                 op.acceptor_impl_,
     190            7481 :                 op.accepted_fd,
     191            7481 :                 op.peer_storage,
     192                 :                 op.impl_out,
     193                 :                 op.ec_out))
     194 MIS           0 :             success = false;
     195                 :     }
     196                 : 
     197 HIT        7493 :     if (!success || !op.acceptor_impl_)
     198                 :     {
     199              12 :         if (op.accepted_fd >= 0)
     200                 :         {
     201 MIS           0 :             ::close(op.accepted_fd);
     202               0 :             op.accepted_fd = -1;
     203                 :         }
     204 HIT          12 :         if (op.impl_out)
     205              12 :             *op.impl_out = nullptr;
     206                 :     }
     207                 : 
     208            7493 :     capy::executor_ref saved_ex(op.ex);
     209            7493 :     std::coroutine_handle<> saved_h(op.h);
     210            7493 :     auto prevent = std::move(op.impl_ptr);
     211            7493 :     dispatch_coro(saved_ex, saved_h).resume();
     212            7493 : }
     213                 : 
     214                 : } // namespace boost::corosio::detail
     215                 : 
     216                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
        

Generated by: LCOV version 2.3