include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp

89.3% Lines (150/168) 100.0% List of functions (8/8)
f(x) Functions (8)
Function Calls Lines Branches Blocks
void boost::corosio::detail::complete_io_op<boost::corosio::detail::epoll_op>(boost::corosio::detail::epoll_op&) :39 0 87.5% void boost::corosio::detail::complete_io_op<boost::corosio::detail::select_op>(boost::corosio::detail::select_op&) :39 0 87.5% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::epoll_connect_op>(boost::corosio::detail::epoll_connect_op&) :72 0 95.7% void boost::corosio::detail::complete_connect_op<boost::corosio::detail::select_connect_op>(boost::corosio::detail::select_connect_op&) :72 0 95.7% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_acceptor>(boost::corosio::detail::epoll_acceptor*, int&, sockaddr_storage const&, boost::corosio::io_object::implementation**, std::error_code*) :123 0 89.5% bool boost::corosio::detail::setup_accepted_socket<boost::corosio::detail::select_socket, boost::corosio::detail::select_acceptor>(boost::corosio::detail::select_acceptor*, int&, sockaddr_storage const&, boost::corosio::io_object::implementation**, std::error_code*) :123 0 89.5% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::epoll_socket, boost::corosio::detail::epoll_accept_op>(boost::corosio::detail::epoll_accept_op&) :171 0 84.6% void boost::corosio::detail::complete_accept_op<boost::corosio::detail::select_socket, boost::corosio::detail::select_accept_op>(boost::corosio::detail::select_accept_op&) :171 0 84.6%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
12
13 #include <boost/corosio/detail/dispatch_coro.hpp>
14 #include <boost/corosio/native/detail/endpoint_convert.hpp>
15 #include <boost/corosio/native/detail/make_err.hpp>
16 #include <boost/corosio/io/io_object.hpp>
17
18 #include <coroutine>
19 #include <mutex>
20 #include <utility>
21
22 #include <netinet/in.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25
26 namespace boost::corosio::detail {
27
28 /** Complete a base read/write operation.
29
30 Translates the recorded errno and cancellation state into
31 an error_code, stores the byte count, then resumes the
32 caller via symmetric transfer.
33
34 @tparam Op The concrete operation type.
35 @param op The operation to complete.
36 */
37 template <typename Op>
38 void
39 101676x complete_io_op(Op& op)
40 {
41 101676x op.stop_cb.reset();
42 101676x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
43
44 101676x if (op.cancelled.load(std::memory_order_acquire))
45 310x *op.ec_out = capy::error::canceled;
46 101366x else if (op.errn != 0)
47 *op.ec_out = make_err(op.errn);
48 101366x else if (op.is_read_operation() && op.bytes_transferred == 0)
49 *op.ec_out = capy::error::eof;
50 else
51 101366x *op.ec_out = {};
52
53 101676x *op.bytes_out = op.bytes_transferred;
54
55 101676x capy::executor_ref saved_ex(op.ex);
56 101676x std::coroutine_handle<> saved_h(op.h);
57 101676x auto prevent = std::move(op.impl_ptr);
58 101676x dispatch_coro(saved_ex, saved_h).resume();
59 101676x }
60
61 /** Complete a connect operation with endpoint caching.
62
63 On success, queries the local endpoint via getsockname and
64 caches both endpoints in the socket impl. Then resumes the
65 caller via symmetric transfer.
66
67 @tparam Op The concrete connect operation type.
68 @param op The operation to complete.
69 */
70 template <typename Op>
71 void
72 7485x complete_connect_op(Op& op)
73 {
74 7485x op.stop_cb.reset();
75 7485x op.socket_impl_->desc_state_.scheduler_->reset_inline_budget();
76
77 7485x bool success =
78 7485x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
79
80 7485x if (success && op.socket_impl_)
81 {
82 7481x endpoint local_ep;
83 7481x sockaddr_storage local_storage{};
84 7481x socklen_t local_len = sizeof(local_storage);
85 7481x if (::getsockname(
86 op.fd,
87 reinterpret_cast<sockaddr*>(&local_storage),
88 7481x &local_len) == 0)
89 7481x local_ep = from_sockaddr(local_storage);
90 7481x op.socket_impl_->set_endpoints(local_ep, op.target_endpoint);
91 }
92
93 7485x if (op.cancelled.load(std::memory_order_acquire))
94 *op.ec_out = capy::error::canceled;
95 7485x else if (op.errn != 0)
96 4x *op.ec_out = make_err(op.errn);
97 else
98 7481x *op.ec_out = {};
99
100 7485x capy::executor_ref saved_ex(op.ex);
101 7485x std::coroutine_handle<> saved_h(op.h);
102 7485x auto prevent = std::move(op.impl_ptr);
103 7485x dispatch_coro(saved_ex, saved_h).resume();
104 7485x }
105
106 /** Construct and register a peer socket from an accepted fd.
107
108 Creates a new socket impl via the acceptor's associated
109 socket service, registers it with the scheduler, and caches
110 the local and remote endpoints.
111
112 @tparam SocketImpl The concrete socket implementation type.
113 @tparam AcceptorImpl The concrete acceptor implementation type.
114 @param acceptor_impl The acceptor that accepted the connection.
115 @param accepted_fd The accepted file descriptor (set to -1 on success).
116 @param peer_storage The peer address from accept().
117 @param impl_out Output pointer for the new socket impl.
118 @param ec_out Output pointer for any error.
119 @return True on success, false on failure.
120 */
121 template <typename SocketImpl, typename AcceptorImpl>
122 bool
123 7481x setup_accepted_socket(
124 AcceptorImpl* acceptor_impl,
125 int& accepted_fd,
126 sockaddr_storage const& peer_storage,
127 io_object::implementation** impl_out,
128 std::error_code* ec_out)
129 {
130 7481x auto* socket_svc = acceptor_impl->service().socket_service();
131 7481x if (!socket_svc)
132 {
133 *ec_out = make_err(ENOENT);
134 return false;
135 }
136
137 7481x auto& impl = static_cast<SocketImpl&>(*socket_svc->construct());
138 7481x impl.set_socket(accepted_fd);
139
140 7481x impl.desc_state_.fd = accepted_fd;
141 {
142 7481x std::lock_guard lock(impl.desc_state_.mutex);
143 7481x impl.desc_state_.read_op = nullptr;
144 7481x impl.desc_state_.write_op = nullptr;
145 7481x impl.desc_state_.connect_op = nullptr;
146 7481x }
147 7481x socket_svc->scheduler().register_descriptor(
148 accepted_fd, &impl.desc_state_);
149
150 7481x impl.set_endpoints(
151 acceptor_impl->local_endpoint(),
152 from_sockaddr(peer_storage));
153
154 7481x if (impl_out)
155 7481x *impl_out = &impl;
156 7481x accepted_fd = -1;
157 7481x return true;
158 }
159
160 /** Complete an accept operation.
161
162 Sets up the peer socket on success, or closes the accepted
163 fd on failure. Then resumes the caller via symmetric transfer.
164
165 @tparam SocketImpl The concrete socket implementation type.
166 @tparam Op The concrete accept operation type.
167 @param op The operation to complete.
168 */
169 template <typename SocketImpl, typename Op>
170 void
171 7493x complete_accept_op(Op& op)
172 {
173 7493x op.stop_cb.reset();
174 7493x op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget();
175
176 7493x bool success =
177 7493x (op.errn == 0 && !op.cancelled.load(std::memory_order_acquire));
178
179 7493x if (op.cancelled.load(std::memory_order_acquire))
180 12x *op.ec_out = capy::error::canceled;
181 7481x else if (op.errn != 0)
182 *op.ec_out = make_err(op.errn);
183 else
184 7481x *op.ec_out = {};
185
186 7493x if (success && op.accepted_fd >= 0 && op.acceptor_impl_)
187 {
188 7481x if (!setup_accepted_socket<SocketImpl>(
189 op.acceptor_impl_,
190 7481x op.accepted_fd,
191 7481x op.peer_storage,
192 op.impl_out,
193 op.ec_out))
194 success = false;
195 }
196
197 7493x if (!success || !op.acceptor_impl_)
198 {
199 12x if (op.accepted_fd >= 0)
200 {
201 ::close(op.accepted_fd);
202 op.accepted_fd = -1;
203 }
204 12x if (op.impl_out)
205 12x *op.impl_out = nullptr;
206 }
207
208 7493x capy::executor_ref saved_ex(op.ex);
209 7493x std::coroutine_handle<> saved_h(op.h);
210 7493x auto prevent = std::move(op.impl_ptr);
211 7493x dispatch_coro(saved_ex, saved_h).resume();
212 7493x }
213
214 } // namespace boost::corosio::detail
215
216 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_OP_COMPLETE_HPP
217