TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
15 :
16 : #include <atomic>
17 : #include <cstdint>
18 : #include <memory>
19 : #include <mutex>
20 :
21 : #include <errno.h>
22 : #include <sys/socket.h>
23 :
24 : namespace boost::corosio::detail {
25 :
26 : /// Shared reactor event constants.
27 : /// These match epoll numeric values; kqueue maps its events to the same.
28 : static constexpr std::uint32_t reactor_event_read = 0x001;
29 : static constexpr std::uint32_t reactor_event_write = 0x004;
30 : static constexpr std::uint32_t reactor_event_error = 0x008;
31 :
32 : /** Per-descriptor state shared across reactor backends.
33 :
34 : Tracks pending operations for a file descriptor. The fd is registered
35 : once with the reactor and stays registered until closed. Uses deferred
36 : I/O: the reactor sets ready_events atomically, then enqueues this state.
37 : When popped by the scheduler, invoke_deferred_io() performs I/O under
38 : the mutex and queues completed ops.
39 :
40 : Non-template: uses reactor_op_base pointers so the scheduler and
41 : descriptor_state code exist as a single copy in the binary regardless
42 : of how many backends are compiled in.
43 :
44 : @par Thread Safety
45 : The mutex protects operation pointers and ready flags. ready_events_
46 : and is_enqueued_ are atomic for lock-free reactor access.
47 : */
48 : struct reactor_descriptor_state : scheduler_op
49 : {
50 : /// Protects operation pointers and ready/cancel flags.
51 : std::mutex mutex;
52 :
53 : /// Pending read operation (guarded by `mutex`).
54 : reactor_op_base* read_op = nullptr;
55 :
56 : /// Pending write operation (guarded by `mutex`).
57 : reactor_op_base* write_op = nullptr;
58 :
59 : /// Pending connect operation (guarded by `mutex`).
60 : reactor_op_base* connect_op = nullptr;
61 :
62 : /// True if a read edge event arrived before an op was registered.
63 : bool read_ready = false;
64 :
65 : /// True if a write edge event arrived before an op was registered.
66 : bool write_ready = false;
67 :
68 : /// Deferred read cancellation (IOCP-style cancel semantics).
69 : bool read_cancel_pending = false;
70 :
71 : /// Deferred write cancellation (IOCP-style cancel semantics).
72 : bool write_cancel_pending = false;
73 :
74 : /// Deferred connect cancellation (IOCP-style cancel semantics).
75 : bool connect_cancel_pending = false;
76 :
77 : /// Event mask set during registration (no mutex needed).
78 : std::uint32_t registered_events = 0;
79 :
80 : /// File descriptor this state tracks.
81 : int fd = -1;
82 :
83 : /// Accumulated ready events (set by reactor, read by scheduler).
84 : std::atomic<std::uint32_t> ready_events_{0};
85 :
86 : /// True while this state is queued in the scheduler's completed_ops.
87 : std::atomic<bool> is_enqueued_{false};
88 :
89 : /// Owning scheduler for posting completions.
90 : reactor_scheduler_base const* scheduler_ = nullptr;
91 :
92 : /// Prevents impl destruction while queued in the scheduler.
93 : std::shared_ptr<void> impl_ref_;
94 :
95 : /// Add ready events atomically.
96 : /// Release pairs with the consumer's acquire exchange on
97 : /// ready_events_ so the consumer sees all flags. On x86 (TSO)
98 : /// this compiles to the same LOCK OR as relaxed.
99 HIT 185812 : void add_ready_events(std::uint32_t ev) noexcept
100 : {
101 185812 : ready_events_.fetch_or(ev, std::memory_order_release);
102 185812 : }
103 :
104 : /// Invoke deferred I/O and dispatch completions.
105 185760 : void operator()() override
106 : {
107 185760 : invoke_deferred_io();
108 185760 : }
109 :
110 : /// Destroy without invoking.
111 : /// Called during scheduler::shutdown() drain. Clear impl_ref_ to break
112 : /// the self-referential cycle set by close_socket().
113 52 : void destroy() override
114 : {
115 52 : impl_ref_.reset();
116 52 : }
117 :
118 : /** Perform deferred I/O and queue completions.
119 :
120 : Performs I/O under the mutex and queues completed ops. EAGAIN
121 : ops stay parked in their slot for re-delivery on the next
122 : edge event.
123 : */
124 : void invoke_deferred_io();
125 : };
126 :
127 : inline void
128 185760 : reactor_descriptor_state::invoke_deferred_io()
129 : {
130 185760 : std::shared_ptr<void> prevent_impl_destruction;
131 185760 : op_queue local_ops;
132 :
133 : {
134 185760 : std::lock_guard lock(mutex);
135 :
136 : // Must clear is_enqueued_ and move impl_ref_ under the same
137 : // lock that processes I/O. close_socket() checks is_enqueued_
138 : // under this mutex — without atomicity between the flag store
139 : // and the ref move, close_socket() could see is_enqueued_==false,
140 : // skip setting impl_ref_, and destroy the impl under us.
141 185760 : prevent_impl_destruction = std::move(impl_ref_);
142 185760 : is_enqueued_.store(false, std::memory_order_release);
143 :
144 : std::uint32_t ev =
145 185760 : ready_events_.exchange(0, std::memory_order_acquire);
146 185760 : if (ev == 0)
147 : {
148 : // Mutex unlocks here; compensate for work_cleanup's decrement
149 MIS 0 : scheduler_->compensating_work_started();
150 0 : return;
151 : }
152 :
153 HIT 185760 : int err = 0;
154 185760 : if (ev & reactor_event_error)
155 : {
156 1 : socklen_t len = sizeof(err);
157 1 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
158 MIS 0 : err = errno;
159 HIT 1 : if (err == 0)
160 MIS 0 : err = EIO;
161 : }
162 :
163 HIT 185760 : if (ev & reactor_event_read)
164 : {
165 152789 : if (read_op)
166 : {
167 7621 : auto* rd = read_op;
168 7621 : if (err)
169 MIS 0 : rd->complete(err, 0);
170 : else
171 HIT 7621 : rd->perform_io();
172 :
173 7621 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
174 : {
175 44 : rd->errn = 0;
176 : }
177 : else
178 : {
179 7577 : read_op = nullptr;
180 7577 : local_ops.push(rd);
181 : }
182 : }
183 : else
184 : {
185 145168 : read_ready = true;
186 : }
187 : }
188 185760 : if (ev & reactor_event_write)
189 : {
190 41892 : bool had_write_op = (connect_op || write_op);
191 41892 : if (connect_op)
192 : {
193 7485 : auto* cn = connect_op;
194 7485 : if (err)
195 1 : cn->complete(err, 0);
196 : else
197 7484 : cn->perform_io();
198 7485 : connect_op = nullptr;
199 7485 : local_ops.push(cn);
200 : }
201 41892 : if (write_op)
202 : {
203 MIS 0 : auto* wr = write_op;
204 0 : if (err)
205 0 : wr->complete(err, 0);
206 : else
207 0 : wr->perform_io();
208 :
209 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
210 : {
211 0 : wr->errn = 0;
212 : }
213 : else
214 : {
215 0 : write_op = nullptr;
216 0 : local_ops.push(wr);
217 : }
218 : }
219 HIT 41892 : if (!had_write_op)
220 34407 : write_ready = true;
221 : }
222 185760 : if (err)
223 : {
224 1 : if (read_op)
225 : {
226 MIS 0 : read_op->complete(err, 0);
227 0 : local_ops.push(std::exchange(read_op, nullptr));
228 : }
229 HIT 1 : if (write_op)
230 : {
231 MIS 0 : write_op->complete(err, 0);
232 0 : local_ops.push(std::exchange(write_op, nullptr));
233 : }
234 HIT 1 : if (connect_op)
235 : {
236 MIS 0 : connect_op->complete(err, 0);
237 0 : local_ops.push(std::exchange(connect_op, nullptr));
238 : }
239 : }
240 HIT 185760 : }
241 :
242 : // Execute first handler inline — the scheduler's work_cleanup
243 : // accounts for this as the "consumed" work item
244 185760 : scheduler_op* first = local_ops.pop();
245 185760 : if (first)
246 : {
247 15062 : scheduler_->post_deferred_completions(local_ops);
248 15062 : (*first)();
249 : }
250 : else
251 : {
252 170698 : scheduler_->compensating_work_started();
253 : }
254 185760 : }
255 :
256 : } // namespace boost::corosio::detail
257 :
258 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DESCRIPTOR_STATE_HPP
|