LCOV - code coverage report
Current view: top level - corosio/native/detail/reactor - reactor_descriptor_state.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 74.3 % 74 55 19
Test Date: 2026-03-13 23:11:04 Functions: 100.0 % 4 4

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
      12                 : 
      13                 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
      14                 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
      15                 : 
      16                 : #include <atomic>
      17                 : #include <cstdint>
      18                 : #include <memory>
      19                 : #include <mutex>
      20                 : 
      21                 : #include <errno.h>
      22                 : #include <sys/socket.h>
      23                 : 
      24                 : namespace boost::corosio::detail {
      25                 : 
      26                 : /// Shared reactor event constants.
      27                 : /// These match epoll numeric values; kqueue maps its events to the same.
      28                 : static constexpr std::uint32_t reactor_event_read  = 0x001;
      29                 : static constexpr std::uint32_t reactor_event_write = 0x004;
      30                 : static constexpr std::uint32_t reactor_event_error = 0x008;
      31                 : 
      32                 : /** Per-descriptor state shared across reactor backends.
      33                 : 
      34                 :     Tracks pending operations for a file descriptor. The fd is registered
      35                 :     once with the reactor and stays registered until closed. Uses deferred
      36                 :     I/O: the reactor sets ready_events atomically, then enqueues this state.
      37                 :     When popped by the scheduler, invoke_deferred_io() performs I/O under
      38                 :     the mutex and queues completed ops.
      39                 : 
      40                 :     Non-template: uses reactor_op_base pointers so the scheduler and
      41                 :     descriptor_state code exist as a single copy in the binary regardless
      42                 :     of how many backends are compiled in.
      43                 : 
      44                 :     @par Thread Safety
      45                 :     The mutex protects operation pointers and ready flags. ready_events_
      46                 :     and is_enqueued_ are atomic for lock-free reactor access.
      47                 : */
      48                 : struct reactor_descriptor_state : scheduler_op
      49                 : {
      50                 :     /// Protects operation pointers and ready/cancel flags.
      51                 :     std::mutex mutex;
      52                 : 
      53                 :     /// Pending read operation (guarded by `mutex`).
      54                 :     reactor_op_base* read_op = nullptr;
      55                 : 
      56                 :     /// Pending write operation (guarded by `mutex`).
      57                 :     reactor_op_base* write_op = nullptr;
      58                 : 
      59                 :     /// Pending connect operation (guarded by `mutex`).
      60                 :     reactor_op_base* connect_op = nullptr;
      61                 : 
      62                 :     /// True if a read edge event arrived before an op was registered.
      63                 :     bool read_ready = false;
      64                 : 
      65                 :     /// True if a write edge event arrived before an op was registered.
      66                 :     bool write_ready = false;
      67                 : 
      68                 :     /// Deferred read cancellation (IOCP-style cancel semantics).
      69                 :     bool read_cancel_pending = false;
      70                 : 
      71                 :     /// Deferred write cancellation (IOCP-style cancel semantics).
      72                 :     bool write_cancel_pending = false;
      73                 : 
      74                 :     /// Deferred connect cancellation (IOCP-style cancel semantics).
      75                 :     bool connect_cancel_pending = false;
      76                 : 
      77                 :     /// Event mask set during registration (no mutex needed).
      78                 :     std::uint32_t registered_events = 0;
      79                 : 
      80                 :     /// File descriptor this state tracks.
      81                 :     int fd = -1;
      82                 : 
      83                 :     /// Accumulated ready events (set by reactor, read by scheduler).
      84                 :     std::atomic<std::uint32_t> ready_events_{0};
      85                 : 
      86                 :     /// True while this state is queued in the scheduler's completed_ops.
      87                 :     std::atomic<bool> is_enqueued_{false};
      88                 : 
      89                 :     /// Owning scheduler for posting completions.
      90                 :     reactor_scheduler_base const* scheduler_ = nullptr;
      91                 : 
      92                 :     /// Prevents impl destruction while queued in the scheduler.
      93                 :     std::shared_ptr<void> impl_ref_;
      94                 : 
      95                 :     /// Add ready events atomically.
      96                 :     /// Release pairs with the consumer's acquire exchange on
      97                 :     /// ready_events_ so the consumer sees all flags. On x86 (TSO)
      98                 :     /// this compiles to the same LOCK OR as relaxed.
      99 HIT      185812 :     void add_ready_events(std::uint32_t ev) noexcept
     100                 :     {
     101          185812 :         ready_events_.fetch_or(ev, std::memory_order_release);
     102          185812 :     }
     103                 : 
     104                 :     /// Invoke deferred I/O and dispatch completions.
     105          185760 :     void operator()() override
     106                 :     {
     107          185760 :         invoke_deferred_io();
     108          185760 :     }
     109                 : 
     110                 :     /// Destroy without invoking.
     111                 :     /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
     112                 :     /// the self-referential cycle set by close_socket().
     113              52 :     void destroy() override
     114                 :     {
     115              52 :         impl_ref_.reset();
     116              52 :     }
     117                 : 
     118                 :     /** Perform deferred I/O and queue completions.
     119                 : 
     120                 :         Performs I/O under the mutex and queues completed ops. EAGAIN
     121                 :         ops stay parked in their slot for re-delivery on the next
     122                 :         edge event.
     123                 :     */
     124                 :     void invoke_deferred_io();
     125                 : };
     126                 : 
     127                 : inline void
     128          185760 : reactor_descriptor_state::invoke_deferred_io()
     129                 : {
     130          185760 :     std::shared_ptr<void> prevent_impl_destruction;
     131          185760 :     op_queue local_ops;
     132                 : 
     133                 :     {
     134          185760 :         std::lock_guard lock(mutex);
     135                 : 
     136                 :         // Must clear is_enqueued_ and move impl_ref_ under the same
     137                 :         // lock that processes I/O. close_socket() checks is_enqueued_
     138                 :         // under this mutex — without atomicity between the flag store
     139                 :         // and the ref move, close_socket() could see is_enqueued_==false,
     140                 :         // skip setting impl_ref_, and destroy the impl under us.
     141          185760 :         prevent_impl_destruction = std::move(impl_ref_);
     142          185760 :         is_enqueued_.store(false, std::memory_order_release);
     143                 : 
     144                 :         std::uint32_t ev =
     145          185760 :             ready_events_.exchange(0, std::memory_order_acquire);
     146          185760 :         if (ev == 0)
     147                 :         {
     148                 :             // Mutex unlocks here; compensate for work_cleanup's decrement
     149 MIS           0 :             scheduler_->compensating_work_started();
     150               0 :             return;
     151                 :         }
     152                 : 
     153 HIT      185760 :         int err = 0;
     154          185760 :         if (ev & reactor_event_error)
     155                 :         {
     156               1 :             socklen_t len = sizeof(err);
     157               1 :             if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     158 MIS           0 :                 err = errno;
     159 HIT           1 :             if (err == 0)
     160 MIS           0 :                 err = EIO;
     161                 :         }
     162                 : 
     163 HIT      185760 :         if (ev & reactor_event_read)
     164                 :         {
     165          152789 :             if (read_op)
     166                 :             {
     167            7621 :                 auto* rd = read_op;
     168            7621 :                 if (err)
     169 MIS           0 :                     rd->complete(err, 0);
     170                 :                 else
     171 HIT        7621 :                     rd->perform_io();
     172                 : 
     173            7621 :                 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     174                 :                 {
     175              44 :                     rd->errn = 0;
     176                 :                 }
     177                 :                 else
     178                 :                 {
     179            7577 :                     read_op = nullptr;
     180            7577 :                     local_ops.push(rd);
     181                 :                 }
     182                 :             }
     183                 :             else
     184                 :             {
     185          145168 :                 read_ready = true;
     186                 :             }
     187                 :         }
     188          185760 :         if (ev & reactor_event_write)
     189                 :         {
     190           41892 :             bool had_write_op = (connect_op || write_op);
     191           41892 :             if (connect_op)
     192                 :             {
     193            7485 :                 auto* cn = connect_op;
     194            7485 :                 if (err)
     195               1 :                     cn->complete(err, 0);
     196                 :                 else
     197            7484 :                     cn->perform_io();
     198            7485 :                 connect_op = nullptr;
     199            7485 :                 local_ops.push(cn);
     200                 :             }
     201           41892 :             if (write_op)
     202                 :             {
     203 MIS           0 :                 auto* wr = write_op;
     204               0 :                 if (err)
     205               0 :                     wr->complete(err, 0);
     206                 :                 else
     207               0 :                     wr->perform_io();
     208                 : 
     209               0 :                 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     210                 :                 {
     211               0 :                     wr->errn = 0;
     212                 :                 }
     213                 :                 else
     214                 :                 {
     215               0 :                     write_op = nullptr;
     216               0 :                     local_ops.push(wr);
     217                 :                 }
     218                 :             }
     219 HIT       41892 :             if (!had_write_op)
     220           34407 :                 write_ready = true;
     221                 :         }
     222          185760 :         if (err)
     223                 :         {
     224               1 :             if (read_op)
     225                 :             {
     226 MIS           0 :                 read_op->complete(err, 0);
     227               0 :                 local_ops.push(std::exchange(read_op, nullptr));
     228                 :             }
     229 HIT           1 :             if (write_op)
     230                 :             {
     231 MIS           0 :                 write_op->complete(err, 0);
     232               0 :                 local_ops.push(std::exchange(write_op, nullptr));
     233                 :             }
     234 HIT           1 :             if (connect_op)
     235                 :             {
     236 MIS           0 :                 connect_op->complete(err, 0);
     237               0 :                 local_ops.push(std::exchange(connect_op, nullptr));
     238                 :             }
     239                 :         }
     240 HIT      185760 :     }
     241                 : 
     242                 :     // Execute first handler inline — the scheduler's work_cleanup
     243                 :     // accounts for this as the "consumed" work item
     244          185760 :     scheduler_op* first = local_ops.pop();
     245          185760 :     if (first)
     246                 :     {
     247           15062 :         scheduler_->post_deferred_completions(local_ops);
     248           15062 :         (*first)();
     249                 :     }
     250                 :     else
     251                 :     {
     252          170698 :         scheduler_->compensating_work_started();
     253                 :     }
     254          185760 : }
     255                 : 
     256                 : } // namespace boost::corosio::detail
     257                 : 
     258                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
        

Generated by: LCOV version 2.3