include/boost/corosio/native/detail/reactor/reactor_op.hpp

85.1% Lines (143/168) 87.5% List of functions (28/32)
f(x) Functions (32)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::canceller::operator()() const :56 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::canceller::operator()() const :56 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::reactor_op() :86 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::reactor_op() :86 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::reset() :89 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::reset() :89 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::is_read_operation() const :101 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::is_read_operation() const :101 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::destroy() :110 0 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::destroy() :110 0 0.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_socket*) :117 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_socket*) :117 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>::start(std::stop_token const&, boost::corosio::detail::epoll_acceptor*) :129 0 100.0% boost::corosio::detail::reactor_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>::start(std::stop_token const&, boost::corosio::detail::select_acceptor*) :129 0 87.5% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::reset() :155 0 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::reset() :155 0 100.0% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::epoll_op>::perform_io() :161 0 85.7% boost::corosio::detail::reactor_connect_op<boost::corosio::detail::select_op>::perform_io() :161 0 85.7% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::is_read_operation() const :193 0 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::is_read_operation() const :193 0 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::reset() :198 0 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::reset() :198 0 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::epoll_op>::perform_io() :205 0 100.0% boost::corosio::detail::reactor_read_op<boost::corosio::detail::select_op>::perform_io() :205 0 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::reset() :244 0 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::reset() :244 0 100.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_write_policy>::perform_io() :250 0 0.0% boost::corosio::detail::reactor_write_op<boost::corosio::detail::select_op, boost::corosio::detail::select_write_policy>::perform_io() :250 0 0.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::reset() :283 0 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::reset() :283 0 100.0% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_accept_policy>::perform_io() :292 0 85.7% boost::corosio::detail::reactor_accept_op<boost::corosio::detail::select_op, boost::corosio::detail::select_accept_policy>::perform_io() :292 0 85.7%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
12
13 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
14 #include <boost/corosio/io/io_object.hpp>
15 #include <boost/corosio/endpoint.hpp>
16 #include <boost/capy/ex/executor_ref.hpp>
17
18 #include <atomic>
19 #include <coroutine>
20 #include <cstddef>
21 #include <memory>
22 #include <optional>
23 #include <stop_token>
24 #include <system_error>
25
26 #include <errno.h>
27
28 #include <netinet/in.h>
29 #include <sys/socket.h>
30 #include <sys/uio.h>
31
32 namespace boost::corosio::detail {
33
34 /** Base operation for reactor-based backends.
35
36 Holds per-operation state that depends on the concrete backend
37 socket/acceptor types: coroutine handle, executor, output
38 pointers, file descriptor, stop_callback, and type-specific
39 impl pointers.
40
41 Fields shared across all backends (errn, bytes_transferred,
42 cancelled, impl_ptr, perform_io, complete) live in
43 reactor_op_base so the scheduler and descriptor_state can
44 access them without template instantiation.
45
46 @tparam Socket The backend socket impl type (forward-declared).
47 @tparam Acceptor The backend acceptor impl type (forward-declared).
48 */
49 template<class Socket, class Acceptor>
50 struct reactor_op : reactor_op_base
51 {
52 /// Stop-token callback that invokes cancel() on the target op.
53 struct canceller
54 {
55 reactor_op* op;
56 199x void operator()() const noexcept
57 {
58 199x op->cancel();
59 199x }
60 };
61
62 /// Caller's coroutine handle to resume on completion.
63 std::coroutine_handle<> h;
64
65 /// Executor for dispatching the completion.
66 capy::executor_ref ex;
67
68 /// Output pointer for the error code.
69 std::error_code* ec_out = nullptr;
70
71 /// Output pointer for bytes transferred.
72 std::size_t* bytes_out = nullptr;
73
74 /// File descriptor this operation targets.
75 int fd = -1;
76
77 /// Stop-token callback registration.
78 std::optional<std::stop_callback<canceller>> stop_cb;
79
80 /// Owning socket impl (for stop_token cancellation).
81 Socket* socket_impl_ = nullptr;
82
83 /// Owning acceptor impl (for stop_token cancellation).
84 Acceptor* acceptor_impl_ = nullptr;
85
86 67737x reactor_op() = default;
87
88 /// Reset operation state for reuse.
89 522035x void reset() noexcept
90 {
91 522035x fd = -1;
92 522035x errn = 0;
93 522035x bytes_transferred = 0;
94 522035x cancelled.store(false, std::memory_order_relaxed);
95 522035x impl_ptr.reset();
96 522035x socket_impl_ = nullptr;
97 522035x acceptor_impl_ = nullptr;
98 522035x }
99
100 /// Return true if this is a read-direction operation.
101 50651x virtual bool is_read_operation() const noexcept
102 {
103 50651x return false;
104 }
105
106 /// Cancel this operation via the owning impl.
107 virtual void cancel() noexcept = 0;
108
109 /// Destroy without invoking.
110 void destroy() override
111 {
112 stop_cb.reset();
113 reactor_op_base::destroy();
114 }
115
116 /// Arm the stop-token callback for a socket operation.
117 109161x void start(std::stop_token const& token, Socket* impl)
118 {
119 109161x cancelled.store(false, std::memory_order_release);
120 109161x stop_cb.reset();
121 109161x socket_impl_ = impl;
122 109161x acceptor_impl_ = nullptr;
123
124 109161x if (token.stop_possible())
125 195x stop_cb.emplace(token, canceller{this});
126 109161x }
127
128 /// Arm the stop-token callback for an acceptor operation.
129 7493x void start(std::stop_token const& token, Acceptor* impl)
130 {
131 7493x cancelled.store(false, std::memory_order_release);
132 7493x stop_cb.reset();
133 7493x socket_impl_ = nullptr;
134 7493x acceptor_impl_ = impl;
135
136 7493x if (token.stop_possible())
137 9x stop_cb.emplace(token, canceller{this});
138 7493x }
139 };
140
141 /** Shared connect operation.
142
143 Checks SO_ERROR for connect completion status. The operator()()
144 and cancel() are provided by the concrete backend type.
145
146 @tparam Base The backend's base op type.
147 */
148 template<class Base>
149 struct reactor_connect_op : Base
150 {
151 /// Endpoint to connect to.
152 endpoint target_endpoint;
153
154 /// Reset operation state for reuse.
155 7485x void reset() noexcept
156 {
157 7485x Base::reset();
158 7485x target_endpoint = endpoint{};
159 7485x }
160
161 7484x void perform_io() noexcept override
162 {
163 7484x int err = 0;
164 7484x socklen_t len = sizeof(err);
165 7484x if (::getsockopt(this->fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
166 err = errno;
167 7484x this->complete(err, 0);
168 7484x }
169 };
170
171 /** Shared scatter-read operation.
172
173 Uses readv() with an EINTR retry loop.
174
175 @tparam Base The backend's base op type.
176 */
177 template<class Base>
178 struct reactor_read_op : Base
179 {
180 /// Maximum scatter-gather buffer count.
181 static constexpr std::size_t max_buffers = 16;
182
183 /// Scatter-gather I/O vectors.
184 iovec iovecs[max_buffers];
185
186 /// Number of active I/O vectors.
187 int iovec_count = 0;
188
189 /// True for zero-length reads (completed immediately).
190 bool empty_buffer_read = false;
191
192 /// Return true (this is a read-direction operation).
193 50715x bool is_read_operation() const noexcept override
194 {
195 50715x return !empty_buffer_read;
196 }
197
198 253679x void reset() noexcept
199 {
200 253679x Base::reset();
201 253679x iovec_count = 0;
202 253679x empty_buffer_read = false;
203 253679x }
204
205 333x void perform_io() noexcept override
206 {
207 ssize_t n;
208 do
209 {
210 333x n = ::readv(this->fd, iovecs, iovec_count);
211 }
212 333x while (n < 0 && errno == EINTR);
213
214 333x if (n >= 0)
215 100x this->complete(0, static_cast<std::size_t>(n));
216 else
217 233x this->complete(errno, 0);
218 333x }
219 };
220
221 /** Shared gather-write operation.
222
223 Delegates the actual syscall to WritePolicy::write(fd, iovecs, count),
224 which returns ssize_t (bytes written or -1 with errno set).
225
226 @tparam Base The backend's base op type.
227 @tparam WritePolicy Provides `static ssize_t write(int, iovec*, int)`.
228 */
229 template<class Base, class WritePolicy>
230 struct reactor_write_op : Base
231 {
232 /// The write syscall policy type.
233 using write_policy = WritePolicy;
234
235 /// Maximum scatter-gather buffer count.
236 static constexpr std::size_t max_buffers = 16;
237
238 /// Scatter-gather I/O vectors.
239 iovec iovecs[max_buffers];
240
241 /// Number of active I/O vectors.
242 int iovec_count = 0;
243
244 253378x void reset() noexcept
245 {
246 253378x Base::reset();
247 253378x iovec_count = 0;
248 253378x }
249
250 void perform_io() noexcept override
251 {
252 ssize_t n = WritePolicy::write(this->fd, iovecs, iovec_count);
253 if (n >= 0)
254 this->complete(0, static_cast<std::size_t>(n));
255 else
256 this->complete(errno, 0);
257 }
258 };
259
260 /** Shared accept operation.
261
262 Delegates the actual syscall to AcceptPolicy::do_accept(fd, peer_storage),
263 which returns the accepted fd or -1 with errno set.
264
265 @tparam Base The backend's base op type.
266 @tparam AcceptPolicy Provides `static int do_accept(int, sockaddr_storage&)`.
267 */
268 template<class Base, class AcceptPolicy>
269 struct reactor_accept_op : Base
270 {
271 /// File descriptor of the accepted connection.
272 int accepted_fd = -1;
273
274 /// Pointer to the peer socket implementation.
275 io_object::implementation* peer_impl = nullptr;
276
277 /// Output pointer for the accepted implementation.
278 io_object::implementation** impl_out = nullptr;
279
280 /// Peer address storage filled by accept.
281 sockaddr_storage peer_storage{};
282
283 7493x void reset() noexcept
284 {
285 7493x Base::reset();
286 7493x accepted_fd = -1;
287 7493x peer_impl = nullptr;
288 7493x impl_out = nullptr;
289 7493x peer_storage = {};
290 7493x }
291
292 7477x void perform_io() noexcept override
293 {
294 7477x int new_fd = AcceptPolicy::do_accept(this->fd, peer_storage);
295 7477x if (new_fd >= 0)
296 {
297 7477x accepted_fd = new_fd;
298 7477x this->complete(0, 0);
299 }
300 else
301 {
302 this->complete(errno, 0);
303 }
304 7477x }
305 };
306
307 } // namespace boost::corosio::detail
308
309 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_HPP
310