include/boost/corosio/native/detail/select/select_socket_service.hpp

79.7% Lines (94/118) 91.3% List of functions (21/23)
f(x) Functions (23)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_socket_service::scheduler() const :78 0 100.0% boost::corosio::detail::select_connect_op::cancel() :91 0 0.0% boost::corosio::detail::select_read_op::cancel() :100 0 80.0% boost::corosio::detail::select_write_op::cancel() :109 0 0.0% boost::corosio::detail::select_op::operator()() :118 0 100.0% boost::corosio::detail::select_connect_op::operator()() :124 0 100.0% boost::corosio::detail::select_socket::select_socket(boost::corosio::detail::select_socket_service&) :129 0 100.0% boost::corosio::detail::select_socket::~select_socket() :134 0 100.0% boost::corosio::detail::select_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :137 0 100.0% boost::corosio::detail::select_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :152 0 100.0% boost::corosio::detail::select_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :164 0 100.0% boost::corosio::detail::select_socket::cancel() :180 0 100.0% boost::corosio::detail::select_socket::close_socket() :186 0 100.0% boost::corosio::detail::select_socket_service::select_socket_service(boost::capy::execution_context&) :191 0 100.0% boost::corosio::detail::select_socket_service::~select_socket_service() :199 0 100.0% boost::corosio::detail::select_socket_service::shutdown() :202 0 80.0% boost::corosio::detail::select_socket_service::construct() :216 0 100.0% boost::corosio::detail::select_socket_service::destroy(boost::corosio::io_object::implementation*) :231 0 100.0% boost::corosio::detail::select_socket_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :241 0 64.7% boost::corosio::detail::select_socket_service::close(boost::corosio::io_object::handle&) :305 0 100.0% boost::corosio::detail::select_socket_service::post(boost::corosio::detail::scheduler_op*) :311 0 100.0% boost::corosio::detail::select_socket_service::work_started() :317 0 100.0% boost::corosio::detail::select_socket_service::work_finished() :323 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
24
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <coroutine>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <netinet/in.h>
34 #include <netinet/tcp.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 /*
40 Each I/O op tries the syscall speculatively; only registers with
41 the reactor on EAGAIN. Fd is registered once at open time and
42 stays registered until close. The reactor only marks ready_events_;
43 actual I/O happens in invoke_deferred_io(). cancel() captures
44 shared_from_this() into op.impl_ptr to keep the impl alive.
45 */
46
47 namespace boost::corosio::detail {
48
49 /// State for select socket service.
50 using select_socket_state =
51 reactor_service_state<select_scheduler, select_socket>;
52
53 /** select socket service implementation.
54
55 Inherits from socket_service to enable runtime polymorphism.
56 Uses key_type = socket_service for service lookup.
57 */
58 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
59 {
60 public:
61 explicit select_socket_service(capy::execution_context& ctx);
62 ~select_socket_service() override;
63
64 select_socket_service(select_socket_service const&) = delete;
65 select_socket_service& operator=(select_socket_service const&) = delete;
66
67 void shutdown() override;
68
69 io_object::implementation* construct() override;
70 void destroy(io_object::implementation*) override;
71 void close(io_object::handle&) override;
72 std::error_code open_socket(
73 tcp_socket::implementation& impl,
74 int family,
75 int type,
76 int protocol) override;
77
78 292886x select_scheduler& scheduler() const noexcept
79 {
80 292886x return state_->sched_;
81 }
82 void post(scheduler_op* op);
83 void work_started() noexcept;
84 void work_finished() noexcept;
85
86 private:
87 std::unique_ptr<select_socket_state> state_;
88 };
89
90 inline void
91 select_connect_op::cancel() noexcept
92 {
93 if (socket_impl_)
94 socket_impl_->cancel_single_op(*this);
95 else
96 request_cancel();
97 }
98
99 inline void
100 94x select_read_op::cancel() noexcept
101 {
102 94x if (socket_impl_)
103 94x socket_impl_->cancel_single_op(*this);
104 else
105 request_cancel();
106 94x }
107
108 inline void
109 select_write_op::cancel() noexcept
110 {
111 if (socket_impl_)
112 socket_impl_->cancel_single_op(*this);
113 else
114 request_cancel();
115 }
116
117 inline void
118 50611x select_op::operator()()
119 {
120 50611x complete_io_op(*this);
121 50611x }
122
123 inline void
124 3086x select_connect_op::operator()()
125 {
126 3086x complete_connect_op(*this);
127 3086x }
128
129 9278x inline select_socket::select_socket(select_socket_service& svc) noexcept
130 9278x : reactor_socket(svc)
131 {
132 9278x }
133
134 9278x inline select_socket::~select_socket() = default;
135
136 inline std::coroutine_handle<>
137 3086x select_socket::connect(
138 std::coroutine_handle<> h,
139 capy::executor_ref ex,
140 endpoint ep,
141 std::stop_token token,
142 std::error_code* ec)
143 {
144 3086x auto result = do_connect(h, ex, ep, token, ec);
145 // Rebuild fd_sets so select() watches for writability
146 3086x if (result == std::noop_coroutine())
147 3086x svc_.scheduler().notify_reactor();
148 3086x return result;
149 }
150
151 inline std::coroutine_handle<>
152 126277x select_socket::read_some(
153 std::coroutine_handle<> h,
154 capy::executor_ref ex,
155 buffer_param param,
156 std::stop_token token,
157 std::error_code* ec,
158 std::size_t* bytes_out)
159 {
160 126277x return do_read_some(h, ex, param, token, ec, bytes_out);
161 }
162
163 inline std::coroutine_handle<>
164 126129x select_socket::write_some(
165 std::coroutine_handle<> h,
166 capy::executor_ref ex,
167 buffer_param param,
168 std::stop_token token,
169 std::error_code* ec,
170 std::size_t* bytes_out)
171 {
172 126129x auto result = do_write_some(h, ex, param, token, ec, bytes_out);
173 // Rebuild fd_sets so select() watches for writability
174 126129x if (result == std::noop_coroutine())
175 25218x svc_.scheduler().notify_reactor();
176 126129x return result;
177 }
178
179 inline void
180 94x select_socket::cancel() noexcept
181 {
182 94x do_cancel();
183 94x }
184
185 inline void
186 27842x select_socket::close_socket() noexcept
187 {
188 27842x do_close_socket();
189 27842x }
190
191 168x inline select_socket_service::select_socket_service(
192 168x capy::execution_context& ctx)
193 168x : state_(
194 std::make_unique<select_socket_state>(
195 168x ctx.use_service<select_scheduler>()))
196 {
197 168x }
198
199 336x inline select_socket_service::~select_socket_service() {}
200
201 inline void
202 168x select_socket_service::shutdown()
203 {
204 168x std::lock_guard lock(state_->mutex_);
205
206 168x while (auto* impl = state_->impl_list_.pop_front())
207 impl->close_socket();
208
209 // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
210 // drains completed_ops_, calling destroy() on each queued op. Letting
211 // ~state_ release the ptrs (during service destruction, after scheduler
212 // shutdown) keeps every impl alive until all ops have been drained.
213 168x }
214
215 inline io_object::implementation*
216 9278x select_socket_service::construct()
217 {
218 9278x auto impl = std::make_shared<select_socket>(*this);
219 9278x auto* raw = impl.get();
220
221 {
222 9278x std::lock_guard lock(state_->mutex_);
223 9278x state_->impl_ptrs_.emplace(raw, std::move(impl));
224 9278x state_->impl_list_.push_back(raw);
225 9278x }
226
227 9278x return raw;
228 9278x }
229
230 inline void
231 9278x select_socket_service::destroy(io_object::implementation* impl)
232 {
233 9278x auto* select_impl = static_cast<select_socket*>(impl);
234 9278x select_impl->close_socket();
235 9278x std::lock_guard lock(state_->mutex_);
236 9278x state_->impl_list_.remove(select_impl);
237 9278x state_->impl_ptrs_.erase(select_impl);
238 9278x }
239
240 inline std::error_code
241 3101x select_socket_service::open_socket(
242 tcp_socket::implementation& impl, int family, int type, int protocol)
243 {
244 3101x auto* select_impl = static_cast<select_socket*>(&impl);
245 3101x select_impl->close_socket();
246
247 3101x int fd = ::socket(family, type, protocol);
248 3101x if (fd < 0)
249 return make_err(errno);
250
251 3101x if (family == AF_INET6)
252 {
253 5x int one = 1;
254 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
255 }
256
257 3101x int flags = ::fcntl(fd, F_GETFL, 0);
258 3101x if (flags == -1)
259 {
260 int errn = errno;
261 ::close(fd);
262 return make_err(errn);
263 }
264 3101x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
265 {
266 int errn = errno;
267 ::close(fd);
268 return make_err(errn);
269 }
270 3101x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
271 {
272 int errn = errno;
273 ::close(fd);
274 return make_err(errn);
275 }
276
277 3101x if (fd >= FD_SETSIZE)
278 {
279 ::close(fd);
280 return make_err(EMFILE);
281 }
282
283 #ifdef SO_NOSIGPIPE
284 {
285 int one = 1;
286 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
287 }
288 #endif
289
290 3101x select_impl->fd_ = fd;
291
292 3101x select_impl->desc_state_.fd = fd;
293 {
294 3101x std::lock_guard lock(select_impl->desc_state_.mutex);
295 3101x select_impl->desc_state_.read_op = nullptr;
296 3101x select_impl->desc_state_.write_op = nullptr;
297 3101x select_impl->desc_state_.connect_op = nullptr;
298 3101x }
299 3101x scheduler().register_descriptor(fd, &select_impl->desc_state_);
300
301 3101x return {};
302 }
303
304 inline void
305 15463x select_socket_service::close(io_object::handle& h)
306 {
307 15463x static_cast<select_socket*>(h.get())->close_socket();
308 15463x }
309
310 inline void
311 50563x select_socket_service::post(scheduler_op* op)
312 {
313 50563x state_->sched_.post(op);
314 50563x }
315
316 inline void
317 3278x select_socket_service::work_started() noexcept
318 {
319 3278x state_->sched_.work_started();
320 3278x }
321
322 inline void
323 144x select_socket_service::work_finished() noexcept
324 {
325 144x state_->sched_.work_finished();
326 144x }
327
328 } // namespace boost::corosio::detail
329
330 #endif // BOOST_COROSIO_HAS_SELECT
331
332 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
333