TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12 :
13 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 : #include <boost/corosio/io/io_object.hpp>
15 : #include <boost/corosio/endpoint.hpp>
16 : #include <boost/capy/ex/executor_ref.hpp>
17 :
18 : #include <atomic>
19 : #include <coroutine>
20 : #include <cstddef>
21 : #include <memory>
22 : #include <optional>
23 : #include <stop_token>
24 : #include <system_error>
25 :
26 : #include <errno.h>
27 :
28 : #include <netinet/in.h>
29 : #include <sys/socket.h>
30 : #include <sys/uio.h>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : /** Base operation for reactor-based backends.
35 :
36 : Holds per-operation state that depends on the concrete backend
37 : socket/acceptor types: coroutine handle, executor, output
38 : pointers, file descriptor, stop_callback, and type-specific
39 : impl pointers.
40 :
41 : Fields shared across all backends (errn, bytes_transferred,
42 : cancelled, impl_ptr, perform_io, complete) live in
43 : reactor_op_base so the scheduler and descriptor_state can
44 : access them without template instantiation.
45 :
46 : @tparam Socket The backend socket impl type (forward-declared).
47 : @tparam Acceptor The backend acceptor impl type (forward-declared).
48 : */
49 : template<class Socket, class Acceptor>
50 : struct reactor_op : reactor_op_base
51 : {
52 : /// Stop-token callback that invokes cancel() on the target op.
53 : struct canceller
54 : {
55 : reactor_op* op;
56 HIT 199 : void operator()() const noexcept
57 : {
58 199 : op->cancel();
59 199 : }
60 : };
61 :
62 : /// Caller's coroutine handle to resume on completion.
63 : std::coroutine_handle<> h;
64 :
65 : /// Executor for dispatching the completion.
66 : capy::executor_ref ex;
67 :
68 : /// Output pointer for the error code.
69 : std::error_code* ec_out = nullptr;
70 :
71 : /// Output pointer for bytes transferred.
72 : std::size_t* bytes_out = nullptr;
73 :
74 : /// File descriptor this operation targets.
75 : int fd = -1;
76 :
77 : /// Stop-token callback registration.
78 : std::optional<std::stop_callback<canceller>> stop_cb;
79 :
80 : /// Owning socket impl (for stop_token cancellation).
81 : Socket* socket_impl_ = nullptr;
82 :
83 : /// Owning acceptor impl (for stop_token cancellation).
84 : Acceptor* acceptor_impl_ = nullptr;
85 :
86 67737 : reactor_op() = default;
87 :
88 : /// Reset operation state for reuse.
89 522035 : void reset() noexcept
90 : {
91 522035 : fd = -1;
92 522035 : errn = 0;
93 522035 : bytes_transferred = 0;
94 522035 : cancelled.store(false, std::memory_order_relaxed);
95 522035 : impl_ptr.reset();
96 522035 : socket_impl_ = nullptr;
97 522035 : acceptor_impl_ = nullptr;
98 522035 : }
99 :
100 : /// Return true if this is a read-direction operation.
101 50651 : virtual bool is_read_operation() const noexcept
102 : {
103 50651 : return false;
104 : }
105 :
106 : /// Cancel this operation via the owning impl.
107 : virtual void cancel() noexcept = 0;
108 :
109 : /// Destroy without invoking.
110 MIS 0 : void destroy() override
111 : {
112 0 : stop_cb.reset();
113 0 : reactor_op_base::destroy();
114 0 : }
115 :
116 : /// Arm the stop-token callback for a socket operation.
117 HIT 109161 : void start(std::stop_token const& token, Socket* impl)
118 : {
119 109161 : cancelled.store(false, std::memory_order_release);
120 109161 : stop_cb.reset();
121 109161 : socket_impl_ = impl;
122 109161 : acceptor_impl_ = nullptr;
123 :
124 109161 : if (token.stop_possible())
125 195 : stop_cb.emplace(token, canceller{this});
126 109161 : }
127 :
128 : /// Arm the stop-token callback for an acceptor operation.
129 7493 : void start(std::stop_token const& token, Acceptor* impl)
130 : {
131 7493 : cancelled.store(false, std::memory_order_release);
132 7493 : stop_cb.reset();
133 7493 : socket_impl_ = nullptr;
134 7493 : acceptor_impl_ = impl;
135 :
136 7493 : if (token.stop_possible())
137 9 : stop_cb.emplace(token, canceller{this});
138 7493 : }
139 : };
140 :
141 : /** Shared connect operation.
142 :
143 : Checks SO_ERROR for connect completion status. The operator()()
144 : and cancel() are provided by the concrete backend type.
145 :
146 : @tparam Base The backend's base op type.
147 : */
148 : template<class Base>
149 : struct reactor_connect_op : Base
150 : {
151 : /// Endpoint to connect to.
152 : endpoint target_endpoint;
153 :
154 : /// Reset operation state for reuse.
155 7485 : void reset() noexcept
156 : {
157 7485 : Base::reset();
158 7485 : target_endpoint = endpoint{};
159 7485 : }
160 :
161 7484 : void perform_io() noexcept override
162 : {
163 7484 : int err = 0;
164 7484 : socklen_t len = sizeof(err);
165 7484 : if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
166 MIS 0 : err = errno;
167 HIT 7484 : this->complete(err, 0);
168 7484 : }
169 : };
170 :
171 : /** Shared scatter-read operation.
172 :
173 : Uses readv() with an EINTR retry loop.
174 :
175 : @tparam Base The backend's base op type.
176 : */
177 : template<class Base>
178 : struct reactor_read_op : Base
179 : {
180 : /// Maximum scatter-gather buffer count.
181 : static constexpr std::size_t max_buffers = 16;
182 :
183 : /// Scatter-gather I/O vectors.
184 : iovec iovecs[max_buffers];
185 :
186 : /// Number of active I/O vectors.
187 : int iovec_count = 0;
188 :
189 : /// True for zero-length reads (completed immediately).
190 : bool empty_buffer_read = false;
191 :
192 : /// Return true (this is a read-direction operation).
193 50715 : bool is_read_operation() const noexcept override
194 : {
195 50715 : return !empty_buffer_read;
196 : }
197 :
198 253679 : void reset() noexcept
199 : {
200 253679 : Base::reset();
201 253679 : iovec_count = 0;
202 253679 : empty_buffer_read = false;
203 253679 : }
204 :
205 333 : void perform_io() noexcept override
206 : {
207 : ssize_t n;
208 : do
209 : {
210 333 : n = ::readv(this->fd, iovecs, iovec_count);
211 : }
212 333 : while (n < 0 && errno == EINTR);
213 :
214 333 : if (n >= 0)
215 100 : this->complete(0, static_cast<std::size_t>(n));
216 : else
217 233 : this->complete(errno, 0);
218 333 : }
219 : };
220 :
221 : /** Shared gather-write operation.
222 :
223 : Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
224 : which returns ssize_t (bytes written or -1 with errno set).
225 :
226 : @tparam Base The backend's base op type.
227 : @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
228 : */
229 : template<class Base, class WritePolicy>
230 : struct reactor_write_op : Base
231 : {
232 : /// The write syscall policy type.
233 : using write_policy = WritePolicy;
234 :
235 : /// Maximum scatter-gather buffer count.
236 : static constexpr std::size_t max_buffers = 16;
237 :
238 : /// Scatter-gather I/O vectors.
239 : iovec iovecs[max_buffers];
240 :
241 : /// Number of active I/O vectors.
242 : int iovec_count = 0;
243 :
244 253378 : void reset() noexcept
245 : {
246 253378 : Base::reset();
247 253378 : iovec_count = 0;
248 253378 : }
249 :
250 MIS 0 : void perform_io() noexcept override
251 : {
252 0 : ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
253 0 : if (n >= 0)
254 0 : this->complete(0, static_cast<std::size_t>(n));
255 : else
256 0 : this->complete(errno, 0);
257 0 : }
258 : };
259 :
260 : /** Shared accept operation.
261 :
262 : Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
263 : which returns the accepted fd or -1 with errno set.
264 :
265 : @tparam Base The backend's base op type.
266 : @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
267 : */
268 : template<class Base, class AcceptPolicy>
269 : struct reactor_accept_op : Base
270 : {
271 : /// File descriptor of the accepted connection.
272 : int accepted_fd = -1;
273 :
274 : /// Pointer to the peer socket implementation.
275 : io_object::implementation* peer_impl = nullptr;
276 :
277 : /// Output pointer for the accepted implementation.
278 : io_object::implementation** impl_out = nullptr;
279 :
280 : /// Peer address storage filled by accept.
281 : sockaddr_storage peer_storage{};
282 :
283 HIT 7493 : void reset() noexcept
284 : {
285 7493 : Base::reset();
286 7493 : accepted_fd = -1;
287 7493 : peer_impl = nullptr;
288 7493 : impl_out = nullptr;
289 7493 : peer_storage = {};
290 7493 : }
291 :
292 7477 : void perform_io() noexcept override
293 : {
294 7477 : int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
295 7477 : if (new_fd >= 0)
296 : {
297 7477 : accepted_fd = new_fd;
298 7477 : this->complete(0, 0);
299 : }
300 : else
301 : {
302 MIS 0 : this->complete(errno, 0);
303 : }
304 HIT 7477 : }
305 : };
306 :
307 : } // namespace boost::corosio::detail
308 :
309 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
|