include/boost/corosio/native/detail/reactor/reactor_socket.hpp

77.9% Lines (434/557) 100.0% List of functions (32/32)
f(x) Functions (32)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::reactor_socket(boost::corosio::detail::epoll_socket_service&) :66 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::reactor_socket(boost::corosio::detail::select_socket_service&) :66 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::~reactor_socket() :87 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::~reactor_socket() :87 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::native_handle() const :90 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::native_handle() const :90 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::local_endpoint() const :96 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::local_endpoint() const :96 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::remote_endpoint() const :102 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::remote_endpoint() const :102 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::shutdown(boost::corosio::tcp_socket::shutdown_type) :114 0 81.2% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::shutdown(boost::corosio::tcp_socket::shutdown_type) :114 0 81.2% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::set_option(int, int, void const*, unsigned long) :137 0 80.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::set_option(int, int, void const*, unsigned long) :137 0 80.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::get_option(int, int, void*, unsigned long*) const :151 0 83.3% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::get_option(int, int, void*, unsigned long*) const :151 0 83.3% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::set_socket(int) :162 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::set_socket(int) :162 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::set_endpoints(boost::corosio::endpoint, boost::corosio::endpoint) :168 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::set_endpoints(boost::corosio::endpoint, boost::corosio::endpoint) :168 0 100.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::register_op(boost::corosio::detail::epoll_op&, boost::corosio::detail::reactor_op_base*&, bool&, bool&) :260 0 77.8% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::register_op(boost::corosio::detail::select_op&, boost::corosio::detail::reactor_op_base*&, bool&, bool&) :260 0 77.8% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::cancel_single_op(boost::corosio::detail::epoll_op&) :306 0 65.5% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::cancel_single_op(boost::corosio::detail::select_op&) :306 0 65.5% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_cancel() :355 0 71.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_cancel() :355 0 71.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_close_socket() :408 0 86.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_close_socket() :408 0 86.0% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token const&, std::error_code*) :483 0 47.5% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token const&, std::error_code*) :483 0 47.5% boost::corosio::detail::reactor_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_socket_service, boost::corosio::detail::epoll_op, boost::corosio::detail::epoll_connect_op, boost::corosio::detail::epoll_read_op, boost::corosio::detail::epoll_write_op, boost::corosio::detail::descriptor_state>::do_read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token const&, std::error_code*, unsigned long*) :554 0 98.1% boost::corosio::detail::reactor_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_socket_service, boost::corosio::detail::select_op, boost::corosio::detail::select_connect_op, boost::corosio::detail::select_read_op, boost::corosio::detail::select_write_op, boost::corosio::detail::select_descriptor_state>::do_read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token const&, std::error_code*, unsigned long*) :554 0 98.1%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
12
13 #include <boost/corosio/tcp_socket.hpp>
14 #include <boost/corosio/detail/intrusive.hpp>
15 #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
16 #include <boost/corosio/native/detail/make_err.hpp>
17 #include <boost/corosio/native/detail/endpoint_convert.hpp>
18 #include <boost/corosio/detail/dispatch_coro.hpp>
19 #include <boost/capy/buffers.hpp>
20
21 #include <coroutine>
22 #include <memory>
23 #include <mutex>
24 #include <utility>
25
26 #include <errno.h>
27 #include <netinet/in.h>
28 #include <sys/socket.h>
29 #include <sys/uio.h>
30 #include <unistd.h>
31
32 namespace boost::corosio::detail {
33
34 /** CRTP base for reactor-backed socket implementations.
35
36 Provides shared data members, trivial virtual overrides,
37 non-virtual helper methods for cancellation, registration,
38 close, and the full I/O dispatch logic (`do_connect`,
39 `do_read_some`, `do_write_some`). Concrete backends inherit
40 and add `cancel()`, `close_socket()`, and I/O overrides that
41 delegate to the `do_*` helpers.
42
43 @tparam Derived The concrete socket type (CRTP).
44 @tparam Service The backend's socket service type.
45 @tparam Op The backend's base op type.
46 @tparam ConnOp The backend's connect op type.
47 @tparam ReadOp The backend's read op type.
48 @tparam WriteOp The backend's write op type.
49 @tparam DescState The backend's descriptor_state type.
50 */
51 template<
52 class Derived,
53 class Service,
54 class Op,
55 class ConnOp,
56 class ReadOp,
57 class WriteOp,
58 class DescState>
59 class reactor_socket
60 : public tcp_socket::implementation
61 , public std::enable_shared_from_this<Derived>
62 , public intrusive_list<Derived>::node
63 {
64 friend Derived;
65
66 22532x explicit reactor_socket(Service& svc) noexcept : svc_(svc) {}
67
68 protected:
69 Service& svc_;
70 int fd_ = -1;
71 endpoint local_endpoint_;
72 endpoint remote_endpoint_;
73
74 public:
75 /// Pending connect operation slot.
76 ConnOp conn_;
77
78 /// Pending read operation slot.
79 ReadOp rd_;
80
81 /// Pending write operation slot.
82 WriteOp wr_;
83
84 /// Per-descriptor state for persistent reactor registration.
85 DescState desc_state_;
86
87 22532x ~reactor_socket() override = default;
88
89 /// Return the underlying file descriptor.
90 45509x native_handle_type native_handle() const noexcept override
91 {
92 45509x return fd_;
93 }
94
95 /// Return the cached local endpoint.
96 38x endpoint local_endpoint() const noexcept override
97 {
98 38x return local_endpoint_;
99 }
100
101 /// Return the cached remote endpoint.
102 42x endpoint remote_endpoint() const noexcept override
103 {
104 42x return remote_endpoint_;
105 }
106
107 /// Return true if the socket has an open file descriptor.
108 bool is_open() const noexcept
109 {
110 return fd_ >= 0;
111 }
112
113 /// Shut down part or all of the full-duplex connection.
114 6x std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
115 {
116 int how;
117 6x switch (what)
118 {
119 2x case tcp_socket::shutdown_receive:
120 2x how = SHUT_RD;
121 2x break;
122 2x case tcp_socket::shutdown_send:
123 2x how = SHUT_WR;
124 2x break;
125 2x case tcp_socket::shutdown_both:
126 2x how = SHUT_RDWR;
127 2x break;
128 default:
129 return make_err(EINVAL);
130 }
131 6x if (::shutdown(fd_, how) != 0)
132 return make_err(errno);
133 6x return {};
134 }
135
136 /// Set a socket option.
137 60x std::error_code set_option(
138 int level,
139 int optname,
140 void const* data,
141 std::size_t size) noexcept override
142 {
143 60x if (::setsockopt(
144 60x fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
145 return make_err(errno);
146 60x return {};
147 }
148
149 /// Get a socket option.
150 std::error_code
151 62x get_option(int level, int optname, void* data, std::size_t* size)
152 const noexcept override
153 {
154 62x socklen_t len = static_cast<socklen_t>(*size);
155 62x if (::getsockopt(fd_, level, optname, data, &len) != 0)
156 return make_err(errno);
157 62x *size = static_cast<std::size_t>(len);
158 62x return {};
159 }
160
161 /// Assign the file descriptor.
162 7481x void set_socket(int fd) noexcept
163 {
164 7481x fd_ = fd;
165 7481x }
166
167 /// Cache local and remote endpoints.
168 14962x void set_endpoints(endpoint local, endpoint remote) noexcept
169 {
170 14962x local_endpoint_ = local;
171 14962x remote_endpoint_ = remote;
172 14962x }
173
174 /** Register an op with the reactor.
175
176 Handles cached edge events and deferred cancellation.
177 Called on the EAGAIN/EINPROGRESS path when speculative
178 I/O failed.
179 */
180 void register_op(
181 Op& op,
182 reactor_op_base*& desc_slot,
183 bool& ready_flag,
184 bool& cancel_flag) noexcept;
185
186 /** Cancel a single pending operation.
187
188 Claims the operation from its descriptor_state slot under
189 the mutex and posts it to the scheduler as cancelled.
190
191 @param op The operation to cancel.
192 */
193 void cancel_single_op(Op& op) noexcept;
194
195 /** Cancel all pending operations.
196
197 Invoked by the derived class's cancel() override.
198 */
199 void do_cancel() noexcept;
200
201 /** Close the socket and cancel pending operations.
202
203 Invoked by the derived class's close_socket(). The
204 derived class may add backend-specific cleanup after
205 calling this method.
206 */
207 void do_close_socket() noexcept;
208
209 /** Shared connect dispatch.
210
211 Tries the connect syscall speculatively. On synchronous
212 completion, returns via inline budget or posts through queue.
213 On EINPROGRESS, registers with the reactor.
214 */
215 std::coroutine_handle<> do_connect(
216 std::coroutine_handle<>,
217 capy::executor_ref,
218 endpoint,
219 std::stop_token const&,
220 std::error_code*);
221
222 /** Shared scatter-read dispatch.
223
224 Tries readv() speculatively. On success or hard error,
225 returns via inline budget or posts through queue.
226 On EAGAIN, registers with the reactor.
227 */
228 std::coroutine_handle<> do_read_some(
229 std::coroutine_handle<>,
230 capy::executor_ref,
231 buffer_param,
232 std::stop_token const&,
233 std::error_code*,
234 std::size_t*);
235
236 /** Shared gather-write dispatch.
237
238 Tries the write via WriteOp::write_policy speculatively.
239 On success or hard error, returns via inline budget or
240 posts through queue. On EAGAIN, registers with the reactor.
241 */
242 std::coroutine_handle<> do_write_some(
243 std::coroutine_handle<>,
244 capy::executor_ref,
245 buffer_param,
246 std::stop_token const&,
247 std::error_code*,
248 std::size_t*);
249 };
250
251 template<
252 class Derived,
253 class Service,
254 class Op,
255 class ConnOp,
256 class ReadOp,
257 class WriteOp,
258 class DescState>
259 void
260 7879x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
261 register_op(
262 Op& op,
263 reactor_op_base*& desc_slot,
264 bool& ready_flag,
265 bool& cancel_flag) noexcept
266 {
267 7879x svc_.work_started();
268
269 7879x std::lock_guard lock(desc_state_.mutex);
270 7879x bool io_done = false;
271 7879x if (ready_flag)
272 {
273 189x ready_flag = false;
274 189x op.perform_io();
275 189x io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
276 189x if (!io_done)
277 189x op.errn = 0;
278 }
279
280 7879x if (cancel_flag)
281 {
282 cancel_flag = false;
283 op.cancelled.store(true, std::memory_order_relaxed);
284 }
285
286 7879x if (io_done || op.cancelled.load(std::memory_order_acquire))
287 {
288 svc_.post(&op);
289 svc_.work_finished();
290 }
291 else
292 {
293 7879x desc_slot = &op;
294 }
295 7879x }
296
297 template<
298 class Derived,
299 class Service,
300 class Op,
301 class ConnOp,
302 class ReadOp,
303 class WriteOp,
304 class DescState>
305 void
306 193x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
307 cancel_single_op(Op& op) noexcept
308 {
309 193x auto self = this->weak_from_this().lock();
310 193x if (!self)
311 return;
312
313 193x op.request_cancel();
314
315 193x reactor_op_base** desc_op_ptr = nullptr;
316 193x if (&op == &conn_)
317 desc_op_ptr = &desc_state_.connect_op;
318 193x else if (&op == &rd_)
319 193x desc_op_ptr = &desc_state_.read_op;
320 else if (&op == &wr_)
321 desc_op_ptr = &desc_state_.write_op;
322
323 193x if (desc_op_ptr)
324 {
325 193x reactor_op_base* claimed = nullptr;
326 {
327 193x std::lock_guard lock(desc_state_.mutex);
328 193x if (*desc_op_ptr == &op)
329 193x claimed = std::exchange(*desc_op_ptr, nullptr);
330 else if (&op == &conn_)
331 desc_state_.connect_cancel_pending = true;
332 else if (&op == &rd_)
333 desc_state_.read_cancel_pending = true;
334 else if (&op == &wr_)
335 desc_state_.write_cancel_pending = true;
336 193x }
337 193x if (claimed)
338 {
339 193x op.impl_ptr = self;
340 193x svc_.post(&op);
341 193x svc_.work_finished();
342 }
343 }
344 193x }
345
346 template<
347 class Derived,
348 class Service,
349 class Op,
350 class ConnOp,
351 class ReadOp,
352 class WriteOp,
353 class DescState>
354 void
355 190x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
356 do_cancel() noexcept
357 {
358 190x auto self = this->weak_from_this().lock();
359 190x if (!self)
360 return;
361
362 190x conn_.request_cancel();
363 190x rd_.request_cancel();
364 190x wr_.request_cancel();
365
366 190x reactor_op_base* conn_claimed = nullptr;
367 190x reactor_op_base* rd_claimed = nullptr;
368 190x reactor_op_base* wr_claimed = nullptr;
369 {
370 190x std::lock_guard lock(desc_state_.mutex);
371 190x if (desc_state_.connect_op == &conn_)
372 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
373 190x if (desc_state_.read_op == &rd_)
374 99x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
375 190x if (desc_state_.write_op == &wr_)
376 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
377 190x }
378
379 190x if (conn_claimed)
380 {
381 conn_.impl_ptr = self;
382 svc_.post(&conn_);
383 svc_.work_finished();
384 }
385 190x if (rd_claimed)
386 {
387 99x rd_.impl_ptr = self;
388 99x svc_.post(&rd_);
389 99x svc_.work_finished();
390 }
391 190x if (wr_claimed)
392 {
393 wr_.impl_ptr = self;
394 svc_.post(&wr_);
395 svc_.work_finished();
396 }
397 190x }
398
399 template<
400 class Derived,
401 class Service,
402 class Op,
403 class ConnOp,
404 class ReadOp,
405 class WriteOp,
406 class DescState>
407 void
408 67575x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
409 do_close_socket() noexcept
410 {
411 67575x auto self = this->weak_from_this().lock();
412 67575x if (self)
413 {
414 67575x conn_.request_cancel();
415 67575x rd_.request_cancel();
416 67575x wr_.request_cancel();
417
418 67575x reactor_op_base* conn_claimed = nullptr;
419 67575x reactor_op_base* rd_claimed = nullptr;
420 67575x reactor_op_base* wr_claimed = nullptr;
421 {
422 67575x std::lock_guard lock(desc_state_.mutex);
423 67575x conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
424 67575x rd_claimed = std::exchange(desc_state_.read_op, nullptr);
425 67575x wr_claimed = std::exchange(desc_state_.write_op, nullptr);
426 67575x desc_state_.read_ready = false;
427 67575x desc_state_.write_ready = false;
428 67575x desc_state_.read_cancel_pending = false;
429 67575x desc_state_.write_cancel_pending = false;
430 67575x desc_state_.connect_cancel_pending = false;
431
432 // Keep impl alive while descriptor_state is queued in the
433 // scheduler. Must be under mutex to avoid racing with
434 // invoke_deferred_io()'s move of impl_ref_.
435 67575x if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
436 163x desc_state_.impl_ref_ = self;
437 67575x }
438
439 67575x if (conn_claimed)
440 {
441 conn_.impl_ptr = self;
442 svc_.post(&conn_);
443 svc_.work_finished();
444 }
445 67575x if (rd_claimed)
446 {
447 2x rd_.impl_ptr = self;
448 2x svc_.post(&rd_);
449 2x svc_.work_finished();
450 }
451 67575x if (wr_claimed)
452 {
453 wr_.impl_ptr = self;
454 svc_.post(&wr_);
455 svc_.work_finished();
456 }
457 }
458
459 67575x if (fd_ >= 0)
460 {
461 14996x if (desc_state_.registered_events != 0)
462 14996x svc_.scheduler().deregister_descriptor(fd_);
463 14996x ::close(fd_);
464 14996x fd_ = -1;
465 }
466
467 67575x desc_state_.fd = -1;
468 67575x desc_state_.registered_events = 0;
469
470 67575x local_endpoint_ = endpoint{};
471 67575x remote_endpoint_ = endpoint{};
472 67575x }
473
474 template<
475 class Derived,
476 class Service,
477 class Op,
478 class ConnOp,
479 class ReadOp,
480 class WriteOp,
481 class DescState>
482 std::coroutine_handle<>
483 7485x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
484 do_connect(
485 std::coroutine_handle<> h,
486 capy::executor_ref ex,
487 endpoint ep,
488 std::stop_token const& token,
489 std::error_code* ec)
490 {
491 7485x auto& op = conn_;
492
493 7485x sockaddr_storage storage{};
494 7485x socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
495 7485x int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
496
497 7485x if (result == 0)
498 {
499 sockaddr_storage local_storage{};
500 socklen_t local_len = sizeof(local_storage);
501 if (::getsockname(
502 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
503 0)
504 local_endpoint_ = from_sockaddr(local_storage);
505 remote_endpoint_ = ep;
506 }
507
508 7485x if (result == 0 || errno != EINPROGRESS)
509 {
510 int err = (result < 0) ? errno : 0;
511 if (svc_.scheduler().try_consume_inline_budget())
512 {
513 *ec = err ? make_err(err) : std::error_code{};
514 return dispatch_coro(ex, h);
515 }
516 op.reset();
517 op.h = h;
518 op.ex = ex;
519 op.ec_out = ec;
520 op.fd = fd_;
521 op.target_endpoint = ep;
522 op.start(token, static_cast<Derived*>(this));
523 op.impl_ptr = this->shared_from_this();
524 op.complete(err, 0);
525 svc_.post(&op);
526 return std::noop_coroutine();
527 }
528
529 // EINPROGRESS — register with reactor
530 7485x op.reset();
531 7485x op.h = h;
532 7485x op.ex = ex;
533 7485x op.ec_out = ec;
534 7485x op.fd = fd_;
535 7485x op.target_endpoint = ep;
536 7485x op.start(token, static_cast<Derived*>(this));
537 7485x op.impl_ptr = this->shared_from_this();
538
539 7485x register_op(
540 7485x op, desc_state_.connect_op, desc_state_.write_ready,
541 7485x desc_state_.connect_cancel_pending);
542 7485x return std::noop_coroutine();
543 }
544
545 template<
546 class Derived,
547 class Service,
548 class Op,
549 class ConnOp,
550 class ReadOp,
551 class WriteOp,
552 class DescState>
553 std::coroutine_handle<>
554 253679x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
555 do_read_some(
556 std::coroutine_handle<> h,
557 capy::executor_ref ex,
558 buffer_param param,
559 std::stop_token const& token,
560 std::error_code* ec,
561 std::size_t* bytes_out)
562 {
563 253679x auto& op = rd_;
564 253679x op.reset();
565
566 253679x capy::mutable_buffer bufs[ReadOp::max_buffers];
567 253679x op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
568
569 253679x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
570 {
571 2x op.empty_buffer_read = true;
572 2x op.h = h;
573 2x op.ex = ex;
574 2x op.ec_out = ec;
575 2x op.bytes_out = bytes_out;
576 2x op.start(token, static_cast<Derived*>(this));
577 2x op.impl_ptr = this->shared_from_this();
578 2x op.complete(0, 0);
579 2x svc_.post(&op);
580 2x return std::noop_coroutine();
581 }
582
583 507354x for (int i = 0; i < op.iovec_count; ++i)
584 {
585 253677x op.iovecs[i].iov_base = bufs[i].data();
586 253677x op.iovecs[i].iov_len = bufs[i].size();
587 }
588
589 // Speculative read
590 ssize_t n;
591 do
592 {
593 253677x n = ::readv(fd_, op.iovecs, op.iovec_count);
594 }
595 253677x while (n < 0 && errno == EINTR);
596
597 253677x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
598 {
599 253283x int err = (n < 0) ? errno : 0;
600 253283x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
601
602 253283x if (svc_.scheduler().try_consume_inline_budget())
603 {
604 202662x if (err)
605 *ec = make_err(err);
606 202662x else if (n == 0)
607 10x *ec = capy::error::eof;
608 else
609 202652x *ec = {};
610 202662x *bytes_out = bytes;
611 202662x return dispatch_coro(ex, h);
612 }
613 50621x op.h = h;
614 50621x op.ex = ex;
615 50621x op.ec_out = ec;
616 50621x op.bytes_out = bytes_out;
617 50621x op.start(token, static_cast<Derived*>(this));
618 50621x op.impl_ptr = this->shared_from_this();
619 50621x op.complete(err, bytes);
620 50621x svc_.post(&op);
621 50621x return std::noop_coroutine();
622 }
623
624 // EAGAIN — register with reactor
625 394x op.h = h;
626 394x op.ex = ex;
627 394x op.ec_out = ec;
628 394x op.bytes_out = bytes_out;
629 394x op.fd = fd_;
630 394x op.start(token, static_cast<Derived*>(this));
631 394x op.impl_ptr = this->shared_from_this();
632
633 394x register_op(
634 394x op, desc_state_.read_op, desc_state_.read_ready,
635 394x desc_state_.read_cancel_pending);
636 394x return std::noop_coroutine();
637 }
638
639 template<
640 class Derived,
641 class Service,
642 class Op,
643 class ConnOp,
644 class ReadOp,
645 class WriteOp,
646 class DescState>
647 std::coroutine_handle<>
648 253378x reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
649 do_write_some(
650 std::coroutine_handle<> h,
651 capy::executor_ref ex,
652 buffer_param param,
653 std::stop_token const& token,
654 std::error_code* ec,
655 std::size_t* bytes_out)
656 {
657 253378x auto& op = wr_;
658 253378x op.reset();
659
660 253378x capy::mutable_buffer bufs[WriteOp::max_buffers];
661 253378x op.iovec_count =
662 253378x static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
663
664 253378x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
665 {
666 2x op.h = h;
667 2x op.ex = ex;
668 2x op.ec_out = ec;
669 2x op.bytes_out = bytes_out;
670 2x op.start(token, static_cast<Derived*>(this));
671 2x op.impl_ptr = this->shared_from_this();
672 2x op.complete(0, 0);
673 2x svc_.post(&op);
674 2x return std::noop_coroutine();
675 }
676
677 506752x for (int i = 0; i < op.iovec_count; ++i)
678 {
679 253376x op.iovecs[i].iov_base = bufs[i].data();
680 253376x op.iovecs[i].iov_len = bufs[i].size();
681 }
682
683 // Speculative write via backend-specific write policy
684 253376x ssize_t n = WriteOp::write_policy::write(fd_, op.iovecs, op.iovec_count);
685
686 253376x if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
687 {
688 253376x int err = (n < 0) ? errno : 0;
689 253376x auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
690
691 253376x if (svc_.scheduler().try_consume_inline_budget())
692 {
693 202719x *ec = err ? make_err(err) : std::error_code{};
694 202719x *bytes_out = bytes;
695 202719x return dispatch_coro(ex, h);
696 }
697 50657x op.h = h;
698 50657x op.ex = ex;
699 50657x op.ec_out = ec;
700 50657x op.bytes_out = bytes_out;
701 50657x op.start(token, static_cast<Derived*>(this));
702 50657x op.impl_ptr = this->shared_from_this();
703 50657x op.complete(err, bytes);
704 50657x svc_.post(&op);
705 50657x return std::noop_coroutine();
706 }
707
708 // EAGAIN — register with reactor
709 op.h = h;
710 op.ex = ex;
711 op.ec_out = ec;
712 op.bytes_out = bytes_out;
713 op.fd = fd_;
714 op.start(token, static_cast<Derived*>(this));
715 op.impl_ptr = this->shared_from_this();
716
717 register_op(
718 op, desc_state_.write_op, desc_state_.write_ready,
719 desc_state_.write_cancel_pending);
720 return std::noop_coroutine();
721 }
722
723 } // namespace boost::corosio::detail
724
725 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
726