LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_socket.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 77.8 % 302 235 67
Test Date: 2026-03-13 23:11:04 Functions: 94.4 % 36 34 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
      12                 : 
      13                 : #include <boost/corosio/tcp_socket.hpp>
      14                 : #include <boost/corosio/detail/intrusive.hpp>
      15                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      16                 : #include <boost/corosio/native/detail/make_err.hpp>
      17                 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
      18                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      19                 : #include <boost/capy/buffers.hpp>
      20                 : 
      21                 : #include <coroutine>
      22                 : #include <memory>
      23                 : #include <mutex>
      24                 : #include <utility>
      25                 : 
      26                 : #include <errno.h>
      27                 : #include <netinet/in.h>
      28                 : #include <sys/socket.h>
      29                 : #include <sys/uio.h>
      30                 : #include <unistd.h>
      31                 : 
      32                 : namespace boost::corosio::detail {
      33                 : 
      34                 : /** CRTP base for reactor-backed socket implementations.
      35                 : 
      36                 :     Provides shared data members, trivial virtual overrides,
      37                 :     non-virtual helper methods for cancellation, registration,
      38                 :     close, and the full I/O dispatch logic (`do_connect`,
      39                 :     `do_read_some`, `do_write_some`). Concrete backends inherit
      40                 :     and add `cancel()`, `close_socket()`, and I/O overrides that
      41                 :     delegate to the `do_*` helpers.
      42                 : 
      43                 :     @tparam Derived   The concrete socket type (CRTP).
      44                 :     @tparam Service   The backend's socket service type.
      45                 :     @tparam Op        The backend's base op type.
      46                 :     @tparam ConnOp    The backend's connect op type.
      47                 :     @tparam ReadOp    The backend's read op type.
      48                 :     @tparam WriteOp   The backend's write op type.
      49                 :     @tparam DescState The backend's descriptor_state type.
      50                 : */
      51                 : template<
      52                 :     class Derived,
      53                 :     class Service,
      54                 :     class Op,
      55                 :     class ConnOp,
      56                 :     class ReadOp,
      57                 :     class WriteOp,
      58                 :     class DescState>
      59                 : class reactor_socket
      60                 :     : public tcp_socket::implementation
      61                 :     , public std::enable_shared_from_this<Derived>
      62                 :     , public intrusive_list<Derived>::node
      63                 : {
      64                 :     friend Derived;
      65                 : 
      66 HIT       22532 :     explicit reactor_socket(Service& svc) noexcept : svc_(svc) {}
      67                 : 
      68                 : protected:
      69                 :     Service& svc_;
      70                 :     int fd_ = -1;
      71                 :     endpoint local_endpoint_;
      72                 :     endpoint remote_endpoint_;
      73                 : 
      74                 : public:
      75                 :     /// Pending connect operation slot.
      76                 :     ConnOp conn_;
      77                 : 
      78                 :     /// Pending read operation slot.
      79                 :     ReadOp rd_;
      80                 : 
      81                 :     /// Pending write operation slot.
      82                 :     WriteOp wr_;
      83                 : 
      84                 :     /// Per-descriptor state for persistent reactor registration.
      85                 :     DescState desc_state_;
      86                 : 
      87           22532 :     ~reactor_socket() override = default;
      88                 : 
      89                 :     /// Return the underlying file descriptor.
      90           45509 :     native_handle_type native_handle() const noexcept override
      91                 :     {
      92           45509 :         return fd_;
      93                 :     }
      94                 : 
      95                 :     /// Return the cached local endpoint.
      96              38 :     endpoint local_endpoint() const noexcept override
      97                 :     {
      98              38 :         return local_endpoint_;
      99                 :     }
     100                 : 
     101                 :     /// Return the cached remote endpoint.
     102              42 :     endpoint remote_endpoint() const noexcept override
     103                 :     {
     104              42 :         return remote_endpoint_;
     105                 :     }
     106                 : 
     107                 :     /// Return true if the socket has an open file descriptor.
     108                 :     bool is_open() const noexcept
     109                 :     {
     110                 :         return fd_ >= 0;
     111                 :     }
     112                 : 
     113                 :     /// Shut down part or all of the full-duplex connection.
     114               6 :     std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
     115                 :     {
     116                 :         int how;
     117               6 :         switch (what)
     118                 :         {
     119               2 :         case tcp_socket::shutdown_receive:
     120               2 :             how = SHUT_RD;
     121               2 :             break;
     122               2 :         case tcp_socket::shutdown_send:
     123               2 :             how = SHUT_WR;
     124               2 :             break;
     125               2 :         case tcp_socket::shutdown_both:
     126               2 :             how = SHUT_RDWR;
     127               2 :             break;
     128 MIS           0 :         default:
     129               0 :             return make_err(EINVAL);
     130                 :         }
     131 HIT           6 :         if (::shutdown(fd_, how) != 0)
     132 MIS           0 :             return make_err(errno);
     133 HIT           6 :         return {};
     134                 :     }
     135                 : 
     136                 :     /// Set a socket option.
     137              60 :     std::error_code set_option(
     138                 :         int level,
     139                 :         int optname,
     140                 :         void const* data,
     141                 :         std::size_t size) noexcept override
     142                 :     {
     143              60 :         if (::setsockopt(
     144              60 :                 fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
     145 MIS           0 :             return make_err(errno);
     146 HIT          60 :         return {};
     147                 :     }
     148                 : 
     149                 :     /// Get a socket option.
     150                 :     std::error_code
     151              62 :     get_option(int level, int optname, void* data, std::size_t* size)
     152                 :         const noexcept override
     153                 :     {
     154              62 :         socklen_t len = static_cast<socklen_t>(*size);
     155              62 :         if (::getsockopt(fd_, level, optname, data, &len) != 0)
     156 MIS           0 :             return make_err(errno);
     157 HIT          62 :         *size = static_cast<std::size_t>(len);
     158              62 :         return {};
     159                 :     }
     160                 : 
     161                 :     /// Assign the file descriptor.
     162            7481 :     void set_socket(int fd) noexcept
     163                 :     {
     164            7481 :         fd_ = fd;
     165            7481 :     }
     166                 : 
     167                 :     /// Cache local and remote endpoints.
     168           14962 :     void set_endpoints(endpoint local, endpoint remote) noexcept
     169                 :     {
     170           14962 :         local_endpoint_  = local;
     171           14962 :         remote_endpoint_ = remote;
     172           14962 :     }
     173                 : 
     174                 :     /** Register an op with the reactor.
     175                 : 
     176                 :         Handles cached edge events and deferred cancellation.
     177                 :         Called on the EAGAIN/EINPROGRESS path when speculative
     178                 :         I/O failed.
     179                 :     */
     180                 :     void register_op(
     181                 :         Op& op,
     182                 :         reactor_op_base*& desc_slot,
     183                 :         bool& ready_flag,
     184                 :         bool& cancel_flag) noexcept;
     185                 : 
     186                 :     /** Cancel a single pending operation.
     187                 : 
     188                 :         Claims the operation from its descriptor_state slot under
     189                 :         the mutex and posts it to the scheduler as cancelled.
     190                 : 
     191                 :         @param op The operation to cancel.
     192                 :     */
     193                 :     void cancel_single_op(Op& op) noexcept;
     194                 : 
     195                 :     /** Cancel all pending operations.
     196                 : 
     197                 :         Invoked by the derived class's cancel() override.
     198                 :     */
     199                 :     void do_cancel() noexcept;
     200                 : 
     201                 :     /** Close the socket and cancel pending operations.
     202                 : 
     203                 :         Invoked by the derived class's close_socket(). The
     204                 :         derived class may add backend-specific cleanup after
     205                 :         calling this method.
     206                 :     */
     207                 :     void do_close_socket() noexcept;
     208                 : 
     209                 :     /** Shared connect dispatch.
     210                 : 
     211                 :         Tries the connect syscall speculatively. On synchronous
     212                 :         completion, returns via inline budget or posts through queue.
     213                 :         On EINPROGRESS, registers with the reactor.
     214                 :     */
     215                 :     std::coroutine_handle<> do_connect(
     216                 :         std::coroutine_handle<>,
     217                 :         capy::executor_ref,
     218                 :         endpoint,
     219                 :         std::stop_token const&,
     220                 :         std::error_code*);
     221                 : 
     222                 :     /** Shared scatter-read dispatch.
     223                 : 
     224                 :         Tries readv() speculatively. On success or hard error,
     225                 :         returns via inline budget or posts through queue.
     226                 :         On EAGAIN, registers with the reactor.
     227                 :     */
     228                 :     std::coroutine_handle<> do_read_some(
     229                 :         std::coroutine_handle<>,
     230                 :         capy::executor_ref,
     231                 :         buffer_param,
     232                 :         std::stop_token const&,
     233                 :         std::error_code*,
     234                 :         std::size_t*);
     235                 : 
     236                 :     /** Shared gather-write dispatch.
     237                 : 
     238                 :         Tries the write via WriteOp::write_policy speculatively.
     239                 :         On success or hard error, returns via inline budget or
     240                 :         posts through queue. On EAGAIN, registers with the reactor.
     241                 :     */
     242                 :     std::coroutine_handle<> do_write_some(
     243                 :         std::coroutine_handle<>,
     244                 :         capy::executor_ref,
     245                 :         buffer_param,
     246                 :         std::stop_token const&,
     247                 :         std::error_code*,
     248                 :         std::size_t*);
     249                 : };
     250                 : 
     251                 : template<
     252                 :     class Derived,
     253                 :     class Service,
     254                 :     class Op,
     255                 :     class ConnOp,
     256                 :     class ReadOp,
     257                 :     class WriteOp,
     258                 :     class DescState>
     259                 : void
     260            7879 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     261                 :     register_op(
     262                 :         Op& op,
     263                 :         reactor_op_base*& desc_slot,
     264                 :         bool& ready_flag,
     265                 :         bool& cancel_flag) noexcept
     266                 : {
     267            7879 :     svc_.work_started();
     268                 : 
     269            7879 :     std::lock_guard lock(desc_state_.mutex);
     270            7879 :     bool io_done = false;
     271            7879 :     if (ready_flag)
     272                 :     {
     273             189 :         ready_flag = false;
     274             189 :         op.perform_io();
     275             189 :         io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     276             189 :         if (!io_done)
     277             189 :             op.errn = 0;
     278                 :     }
     279                 : 
     280            7879 :     if (cancel_flag)
     281                 :     {
     282 MIS           0 :         cancel_flag = false;
     283               0 :         op.cancelled.store(true, std::memory_order_relaxed);
     284                 :     }
     285                 : 
     286 HIT        7879 :     if (io_done || op.cancelled.load(std::memory_order_acquire))
     287                 :     {
     288 MIS           0 :         svc_.post(&op);
     289               0 :         svc_.work_finished();
     290                 :     }
     291                 :     else
     292                 :     {
     293 HIT        7879 :         desc_slot = &op;
     294                 :     }
     295            7879 : }
     296                 : 
     297                 : template<
     298                 :     class Derived,
     299                 :     class Service,
     300                 :     class Op,
     301                 :     class ConnOp,
     302                 :     class ReadOp,
     303                 :     class WriteOp,
     304                 :     class DescState>
     305                 : void
     306             193 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     307                 :     cancel_single_op(Op& op) noexcept
     308                 : {
     309             193 :     auto self = this->weak_from_this().lock();
     310             193 :     if (!self)
     311 MIS           0 :         return;
     312                 : 
     313 HIT         193 :     op.request_cancel();
     314                 : 
     315             193 :     reactor_op_base** desc_op_ptr = nullptr;
     316             193 :     if (&op == &conn_)
     317 MIS           0 :         desc_op_ptr = &desc_state_.connect_op;
     318 HIT         193 :     else if (&op == &rd_)
     319             193 :         desc_op_ptr = &desc_state_.read_op;
     320 MIS           0 :     else if (&op == &wr_)
     321               0 :         desc_op_ptr = &desc_state_.write_op;
     322                 : 
     323 HIT         193 :     if (desc_op_ptr)
     324                 :     {
     325             193 :         reactor_op_base* claimed = nullptr;
     326                 :         {
     327             193 :             std::lock_guard lock(desc_state_.mutex);
     328             193 :             if (*desc_op_ptr == &op)
     329             193 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     330 MIS           0 :             else if (&op == &conn_)
     331               0 :                 desc_state_.connect_cancel_pending = true;
     332               0 :             else if (&op == &rd_)
     333               0 :                 desc_state_.read_cancel_pending = true;
     334               0 :             else if (&op == &wr_)
     335               0 :                 desc_state_.write_cancel_pending = true;
     336 HIT         193 :         }
     337             193 :         if (claimed)
     338                 :         {
     339             193 :             op.impl_ptr = self;
     340             193 :             svc_.post(&op);
     341             193 :             svc_.work_finished();
     342                 :         }
     343                 :     }
     344             193 : }
     345                 : 
     346                 : template<
     347                 :     class Derived,
     348                 :     class Service,
     349                 :     class Op,
     350                 :     class ConnOp,
     351                 :     class ReadOp,
     352                 :     class WriteOp,
     353                 :     class DescState>
     354                 : void
     355             190 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     356                 :     do_cancel() noexcept
     357                 : {
     358             190 :     auto self = this->weak_from_this().lock();
     359             190 :     if (!self)
     360 MIS           0 :         return;
     361                 : 
     362 HIT         190 :     conn_.request_cancel();
     363             190 :     rd_.request_cancel();
     364             190 :     wr_.request_cancel();
     365                 : 
     366             190 :     reactor_op_base* conn_claimed = nullptr;
     367             190 :     reactor_op_base* rd_claimed   = nullptr;
     368             190 :     reactor_op_base* wr_claimed   = nullptr;
     369                 :     {
     370             190 :         std::lock_guard lock(desc_state_.mutex);
     371             190 :         if (desc_state_.connect_op == &conn_)
     372 MIS           0 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     373 HIT         190 :         if (desc_state_.read_op == &rd_)
     374              99 :             rd_claimed = std::exchange(desc_state_.read_op, nullptr);
     375             190 :         if (desc_state_.write_op == &wr_)
     376 MIS           0 :             wr_claimed = std::exchange(desc_state_.write_op, nullptr);
     377 HIT         190 :     }
     378                 : 
     379             190 :     if (conn_claimed)
     380                 :     {
     381 MIS           0 :         conn_.impl_ptr = self;
     382               0 :         svc_.post(&conn_);
     383               0 :         svc_.work_finished();
     384                 :     }
     385 HIT         190 :     if (rd_claimed)
     386                 :     {
     387              99 :         rd_.impl_ptr = self;
     388              99 :         svc_.post(&rd_);
     389              99 :         svc_.work_finished();
     390                 :     }
     391             190 :     if (wr_claimed)
     392                 :     {
     393 MIS           0 :         wr_.impl_ptr = self;
     394               0 :         svc_.post(&wr_);
     395               0 :         svc_.work_finished();
     396                 :     }
     397 HIT         190 : }
     398                 : 
     399                 : template<
     400                 :     class Derived,
     401                 :     class Service,
     402                 :     class Op,
     403                 :     class ConnOp,
     404                 :     class ReadOp,
     405                 :     class WriteOp,
     406                 :     class DescState>
     407                 : void
     408           67575 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     409                 :     do_close_socket() noexcept
     410                 : {
     411           67575 :     auto self = this->weak_from_this().lock();
     412           67575 :     if (self)
     413                 :     {
     414           67575 :         conn_.request_cancel();
     415           67575 :         rd_.request_cancel();
     416           67575 :         wr_.request_cancel();
     417                 : 
     418           67575 :         reactor_op_base* conn_claimed = nullptr;
     419           67575 :         reactor_op_base* rd_claimed   = nullptr;
     420           67575 :         reactor_op_base* wr_claimed   = nullptr;
     421                 :         {
     422           67575 :             std::lock_guard lock(desc_state_.mutex);
     423           67575 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     424           67575 :             rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
     425           67575 :             wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
     426           67575 :             desc_state_.read_ready             = false;
     427           67575 :             desc_state_.write_ready            = false;
     428           67575 :             desc_state_.read_cancel_pending    = false;
     429           67575 :             desc_state_.write_cancel_pending   = false;
     430           67575 :             desc_state_.connect_cancel_pending = false;
     431                 : 
     432                 :             // Keep impl alive while descriptor_state is queued in the
     433                 :             // scheduler. Must be under mutex to avoid racing with
     434                 :             // invoke_deferred_io()'s move of impl_ref_.
     435           67575 :             if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     436             163 :                 desc_state_.impl_ref_ = self;
     437           67575 :         }
     438                 : 
     439           67575 :         if (conn_claimed)
     440                 :         {
     441 MIS           0 :             conn_.impl_ptr = self;
     442               0 :             svc_.post(&conn_);
     443               0 :             svc_.work_finished();
     444                 :         }
     445 HIT       67575 :         if (rd_claimed)
     446                 :         {
     447               2 :             rd_.impl_ptr = self;
     448               2 :             svc_.post(&rd_);
     449               2 :             svc_.work_finished();
     450                 :         }
     451           67575 :         if (wr_claimed)
     452                 :         {
     453 MIS           0 :             wr_.impl_ptr = self;
     454               0 :             svc_.post(&wr_);
     455               0 :             svc_.work_finished();
     456                 :         }
     457                 :     }
     458                 : 
     459 HIT       67575 :     if (fd_ >= 0)
     460                 :     {
     461           14996 :         if (desc_state_.registered_events != 0)
     462           14996 :             svc_.scheduler().deregister_descriptor(fd_);
     463           14996 :         ::close(fd_);
     464           14996 :         fd_ = -1;
     465                 :     }
     466                 : 
     467           67575 :     desc_state_.fd                = -1;
     468           67575 :     desc_state_.registered_events = 0;
     469                 : 
     470           67575 :     local_endpoint_  = endpoint{};
     471           67575 :     remote_endpoint_ = endpoint{};
     472           67575 : }
     473                 : 
     474                 : template<
     475                 :     class Derived,
     476                 :     class Service,
     477                 :     class Op,
     478                 :     class ConnOp,
     479                 :     class ReadOp,
     480                 :     class WriteOp,
     481                 :     class DescState>
     482                 : std::coroutine_handle<>
     483            7485 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     484                 :     do_connect(
     485                 :         std::coroutine_handle<> h,
     486                 :         capy::executor_ref ex,
     487                 :         endpoint ep,
     488                 :         std::stop_token const& token,
     489                 :         std::error_code* ec)
     490                 : {
     491            7485 :     auto& op = conn_;
     492                 : 
     493            7485 :     sockaddr_storage storage{};
     494            7485 :     socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
     495            7485 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     496                 : 
     497            7485 :     if (result == 0)
     498                 :     {
     499 MIS           0 :         sockaddr_storage local_storage{};
     500               0 :         socklen_t local_len = sizeof(local_storage);
     501               0 :         if (::getsockname(
     502               0 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
     503                 :             0)
     504               0 :             local_endpoint_ = from_sockaddr(local_storage);
     505               0 :         remote_endpoint_ = ep;
     506                 :     }
     507                 : 
     508 HIT        7485 :     if (result == 0 || errno != EINPROGRESS)
     509                 :     {
     510 MIS           0 :         int err = (result < 0) ? errno : 0;
     511               0 :         if (svc_.scheduler().try_consume_inline_budget())
     512                 :         {
     513               0 :             *ec = err ? make_err(err) : std::error_code{};
     514               0 :             return dispatch_coro(ex, h);
     515                 :         }
     516               0 :         op.reset();
     517               0 :         op.h               = h;
     518               0 :         op.ex              = ex;
     519               0 :         op.ec_out          = ec;
     520               0 :         op.fd              = fd_;
     521               0 :         op.target_endpoint = ep;
     522               0 :         op.start(token, static_cast<Derived*>(this));
     523               0 :         op.impl_ptr = this->shared_from_this();
     524               0 :         op.complete(err, 0);
     525               0 :         svc_.post(&op);
     526               0 :         return std::noop_coroutine();
     527                 :     }
     528                 : 
     529                 :     // EINPROGRESS — register with reactor
     530 HIT        7485 :     op.reset();
     531            7485 :     op.h               = h;
     532            7485 :     op.ex              = ex;
     533            7485 :     op.ec_out          = ec;
     534            7485 :     op.fd              = fd_;
     535            7485 :     op.target_endpoint = ep;
     536            7485 :     op.start(token, static_cast<Derived*>(this));
     537            7485 :     op.impl_ptr = this->shared_from_this();
     538                 : 
     539            7485 :     register_op(
     540            7485 :         op, desc_state_.connect_op, desc_state_.write_ready,
     541            7485 :         desc_state_.connect_cancel_pending);
     542            7485 :     return std::noop_coroutine();
     543                 : }
     544                 : 
     545                 : template<
     546                 :     class Derived,
     547                 :     class Service,
     548                 :     class Op,
     549                 :     class ConnOp,
     550                 :     class ReadOp,
     551                 :     class WriteOp,
     552                 :     class DescState>
     553                 : std::coroutine_handle<>
     554          253679 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     555                 :     do_read_some(
     556                 :         std::coroutine_handle<> h,
     557                 :         capy::executor_ref ex,
     558                 :         buffer_param param,
     559                 :         std::stop_token const& token,
     560                 :         std::error_code* ec,
     561                 :         std::size_t* bytes_out)
     562                 : {
     563          253679 :     auto& op = rd_;
     564          253679 :     op.reset();
     565                 : 
     566          253679 :     capy::mutable_buffer bufs[ReadOp::max_buffers];
     567          253679 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
     568                 : 
     569          253679 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     570                 :     {
     571               2 :         op.empty_buffer_read = true;
     572               2 :         op.h                 = h;
     573               2 :         op.ex                = ex;
     574               2 :         op.ec_out            = ec;
     575               2 :         op.bytes_out         = bytes_out;
     576               2 :         op.start(token, static_cast<Derived*>(this));
     577               2 :         op.impl_ptr = this->shared_from_this();
     578               2 :         op.complete(0, 0);
     579               2 :         svc_.post(&op);
     580               2 :         return std::noop_coroutine();
     581                 :     }
     582                 : 
     583          507354 :     for (int i = 0; i < op.iovec_count; ++i)
     584                 :     {
     585          253677 :         op.iovecs[i].iov_base = bufs[i].data();
     586          253677 :         op.iovecs[i].iov_len  = bufs[i].size();
     587                 :     }
     588                 : 
     589                 :     // Speculative read
     590                 :     ssize_t n;
     591                 :     do
     592                 :     {
     593          253677 :         n = ::readv(fd_, op.iovecs, op.iovec_count);
     594                 :     }
     595          253677 :     while (n < 0 && errno == EINTR);
     596                 : 
     597          253677 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     598                 :     {
     599          253283 :         int err    = (n < 0) ? errno : 0;
     600          253283 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     601                 : 
     602          253283 :         if (svc_.scheduler().try_consume_inline_budget())
     603                 :         {
     604          202662 :             if (err)
     605 MIS           0 :                 *ec = make_err(err);
     606 HIT      202662 :             else if (n == 0)
     607              10 :                 *ec = capy::error::eof;
     608                 :             else
     609          202652 :                 *ec = {};
     610          202662 :             *bytes_out = bytes;
     611          202662 :             return dispatch_coro(ex, h);
     612                 :         }
     613           50621 :         op.h         = h;
     614           50621 :         op.ex        = ex;
     615           50621 :         op.ec_out    = ec;
     616           50621 :         op.bytes_out = bytes_out;
     617           50621 :         op.start(token, static_cast<Derived*>(this));
     618           50621 :         op.impl_ptr = this->shared_from_this();
     619           50621 :         op.complete(err, bytes);
     620           50621 :         svc_.post(&op);
     621           50621 :         return std::noop_coroutine();
     622                 :     }
     623                 : 
     624                 :     // EAGAIN — register with reactor
     625             394 :     op.h         = h;
     626             394 :     op.ex        = ex;
     627             394 :     op.ec_out    = ec;
     628             394 :     op.bytes_out = bytes_out;
     629             394 :     op.fd        = fd_;
     630             394 :     op.start(token, static_cast<Derived*>(this));
     631             394 :     op.impl_ptr = this->shared_from_this();
     632                 : 
     633             394 :     register_op(
     634             394 :         op, desc_state_.read_op, desc_state_.read_ready,
     635             394 :         desc_state_.read_cancel_pending);
     636             394 :     return std::noop_coroutine();
     637                 : }
     638                 : 
     639                 : template<
     640                 :     class Derived,
     641                 :     class Service,
     642                 :     class Op,
     643                 :     class ConnOp,
     644                 :     class ReadOp,
     645                 :     class WriteOp,
     646                 :     class DescState>
     647                 : std::coroutine_handle<>
     648          253378 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
     649                 :     do_write_some(
     650                 :         std::coroutine_handle<> h,
     651                 :         capy::executor_ref ex,
     652                 :         buffer_param param,
     653                 :         std::stop_token const& token,
     654                 :         std::error_code* ec,
     655                 :         std::size_t* bytes_out)
     656                 : {
     657          253378 :     auto& op = wr_;
     658          253378 :     op.reset();
     659                 : 
     660          253378 :     capy::mutable_buffer bufs[WriteOp::max_buffers];
     661          253378 :     op.iovec_count =
     662          253378 :         static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
     663                 : 
     664          253378 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     665                 :     {
     666               2 :         op.h         = h;
     667               2 :         op.ex        = ex;
     668               2 :         op.ec_out    = ec;
     669               2 :         op.bytes_out = bytes_out;
     670               2 :         op.start(token, static_cast<Derived*>(this));
     671               2 :         op.impl_ptr = this->shared_from_this();
     672               2 :         op.complete(0, 0);
     673               2 :         svc_.post(&op);
     674               2 :         return std::noop_coroutine();
     675                 :     }
     676                 : 
     677          506752 :     for (int i = 0; i < op.iovec_count; ++i)
     678                 :     {
     679          253376 :         op.iovecs[i].iov_base = bufs[i].data();
     680          253376 :         op.iovecs[i].iov_len  = bufs[i].size();
     681                 :     }
     682                 : 
     683                 :     // Speculative write via backend-specific write policy
     684          253376 :     ssize_t n = WriteOp::write_policy::write(fd_, op.iovecs, op.iovec_count);
     685                 : 
     686          253376 :     if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
     687                 :     {
     688          253376 :         int err    = (n < 0) ? errno : 0;
     689          253376 :         auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
     690                 : 
     691          253376 :         if (svc_.scheduler().try_consume_inline_budget())
     692                 :         {
     693          202719 :             *ec        = err ? make_err(err) : std::error_code{};
     694          202719 :             *bytes_out = bytes;
     695          202719 :             return dispatch_coro(ex, h);
     696                 :         }
     697           50657 :         op.h         = h;
     698           50657 :         op.ex        = ex;
     699           50657 :         op.ec_out    = ec;
     700           50657 :         op.bytes_out = bytes_out;
     701           50657 :         op.start(token, static_cast<Derived*>(this));
     702           50657 :         op.impl_ptr = this->shared_from_this();
     703           50657 :         op.complete(err, bytes);
     704           50657 :         svc_.post(&op);
     705           50657 :         return std::noop_coroutine();
     706                 :     }
     707                 : 
     708                 :     // EAGAIN — register with reactor
     709 MIS           0 :     op.h         = h;
     710               0 :     op.ex        = ex;
     711               0 :     op.ec_out    = ec;
     712               0 :     op.bytes_out = bytes_out;
     713               0 :     op.fd        = fd_;
     714               0 :     op.start(token, static_cast<Derived*>(this));
     715               0 :     op.impl_ptr = this->shared_from_this();
     716                 : 
     717               0 :     register_op(
     718               0 :         op, desc_state_.write_op, desc_state_.write_ready,
     719               0 :         desc_state_.write_cancel_pending);
     720               0 :     return std::noop_coroutine();
     721                 : }
     722                 : 
     723                 : } // namespace boost::corosio::detail
     724                 : 
     725                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
        

Generated by: LCOV version 2.3