include/boost/corosio/native/detail/select/select_acceptor_service.hpp

61.9% Lines (117/189) 95.0% List of functions (19/20)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::select_acceptor_service::scheduler() const :74 0 100.0% boost::corosio::detail::select_accept_op::cancel() :91 0 0.0% boost::corosio::detail::select_accept_op::operator()() :100 0 100.0% boost::corosio::detail::select_acceptor::select_acceptor(boost::corosio::detail::select_acceptor_service&) :105 0 100.0% boost::corosio::detail::select_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :111 0 41.9% boost::corosio::detail::select_acceptor::cancel() :263 0 100.0% boost::corosio::detail::select_acceptor::close_socket() :269 0 100.0% boost::corosio::detail::select_acceptor_service::select_acceptor_service(boost::capy::execution_context&) :274 0 100.0% boost::corosio::detail::select_acceptor_service::~select_acceptor_service() :283 0 100.0% boost::corosio::detail::select_acceptor_service::shutdown() :286 0 80.0% boost::corosio::detail::select_acceptor_service::construct() :299 0 100.0% boost::corosio::detail::select_acceptor_service::destroy(boost::corosio::io_object::implementation*) :312 0 100.0% boost::corosio::detail::select_acceptor_service::close(boost::corosio::io_object::handle&) :322 0 100.0% boost::corosio::detail::select_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :328 0 61.3% boost::corosio::detail::select_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :391 0 100.0% boost::corosio::detail::select_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :398 0 100.0% boost::corosio::detail::select_acceptor_service::post(boost::corosio::detail::scheduler_op*) :405 0 100.0% boost::corosio::detail::select_acceptor_service::work_started() :411 0 100.0% boost::corosio::detail::select_acceptor_service::work_finished() :417 0 100.0% boost::corosio::detail::select_acceptor_service::socket_service() const :423 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <sys/select.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 namespace boost::corosio::detail {
40
41 /// State for select acceptor service.
42 using select_acceptor_state =
43 reactor_service_state<select_scheduler, select_acceptor>;
44
45 /** select acceptor service implementation.
46
47 Inherits from acceptor_service to enable runtime polymorphism.
48 Uses key_type = acceptor_service for service lookup.
49 */
50 class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
51 {
52 public:
53 explicit select_acceptor_service(capy::execution_context& ctx);
54 ~select_acceptor_service() override;
55
56 select_acceptor_service(select_acceptor_service const&) = delete;
57 select_acceptor_service& operator=(select_acceptor_service const&) = delete;
58
59 void shutdown() override;
60
61 io_object::implementation* construct() override;
62 void destroy(io_object::implementation*) override;
63 void close(io_object::handle&) override;
64 std::error_code open_acceptor_socket(
65 tcp_acceptor::implementation& impl,
66 int family,
67 int type,
68 int protocol) override;
69 std::error_code
70 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
71 std::error_code
72 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
73
74 116x select_scheduler& scheduler() const noexcept
75 {
76 116x return state_->sched_;
77 }
78 void post(scheduler_op* op);
79 void work_started() noexcept;
80 void work_finished() noexcept;
81
82 /** Get the socket service for creating peer sockets during accept. */
83 select_socket_service* socket_service() const noexcept;
84
85 private:
86 capy::execution_context& ctx_;
87 std::unique_ptr<select_acceptor_state> state_;
88 };
89
90 inline void
91 select_accept_op::cancel() noexcept
92 {
93 if (acceptor_impl_)
94 acceptor_impl_->cancel_single_op(*this);
95 else
96 request_cancel();
97 }
98
99 inline void
100 3087x select_accept_op::operator()()
101 {
102 3087x complete_accept_op<select_socket>(*this);
103 3087x }
104
105 61x inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
106 61x : reactor_acceptor(svc)
107 {
108 61x }
109
110 inline std::coroutine_handle<>
111 3087x select_acceptor::accept(
112 std::coroutine_handle<> h,
113 capy::executor_ref ex,
114 std::stop_token token,
115 std::error_code* ec,
116 io_object::implementation** impl_out)
117 {
118 3087x auto& op = acc_;
119 3087x op.reset();
120 3087x op.h = h;
121 3087x op.ex = ex;
122 3087x op.ec_out = ec;
123 3087x op.impl_out = impl_out;
124 3087x op.fd = fd_;
125 3087x op.start(token, this);
126
127 3087x sockaddr_storage peer_storage{};
128 3087x socklen_t addrlen = sizeof(peer_storage);
129 int accepted;
130 do
131 {
132 accepted =
133 3087x ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
134 }
135 3087x while (accepted < 0 && errno == EINTR);
136
137 3087x if (accepted >= 0)
138 {
139 2x if (accepted >= FD_SETSIZE)
140 {
141 ::close(accepted);
142 op.complete(EINVAL, 0);
143 op.impl_ptr = shared_from_this();
144 svc_.post(&op);
145 return std::noop_coroutine();
146 }
147
148 2x int flags = ::fcntl(accepted, F_GETFL, 0);
149 2x if (flags == -1)
150 {
151 int err = errno;
152 ::close(accepted);
153 op.complete(err, 0);
154 op.impl_ptr = shared_from_this();
155 svc_.post(&op);
156 return std::noop_coroutine();
157 }
158
159 2x if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
160 {
161 int err = errno;
162 ::close(accepted);
163 op.complete(err, 0);
164 op.impl_ptr = shared_from_this();
165 svc_.post(&op);
166 return std::noop_coroutine();
167 }
168
169 2x if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
170 {
171 int err = errno;
172 ::close(accepted);
173 op.complete(err, 0);
174 op.impl_ptr = shared_from_this();
175 svc_.post(&op);
176 return std::noop_coroutine();
177 }
178
179 {
180 2x std::lock_guard lock(desc_state_.mutex);
181 2x desc_state_.read_ready = false;
182 2x }
183
184 2x if (svc_.scheduler().try_consume_inline_budget())
185 {
186 auto* socket_svc = svc_.socket_service();
187 if (socket_svc)
188 {
189 auto& impl =
190 static_cast<select_socket&>(*socket_svc->construct());
191 impl.set_socket(accepted);
192
193 impl.desc_state_.fd = accepted;
194 {
195 std::lock_guard lock(impl.desc_state_.mutex);
196 impl.desc_state_.read_op = nullptr;
197 impl.desc_state_.write_op = nullptr;
198 impl.desc_state_.connect_op = nullptr;
199 }
200 socket_svc->scheduler().register_descriptor(
201 accepted, &impl.desc_state_);
202
203 impl.set_endpoints(
204 local_endpoint_, from_sockaddr(peer_storage));
205
206 *ec = {};
207 if (impl_out)
208 *impl_out = &impl;
209 }
210 else
211 {
212 ::close(accepted);
213 *ec = make_err(ENOENT);
214 if (impl_out)
215 *impl_out = nullptr;
216 }
217 return dispatch_coro(ex, h);
218 }
219
220 2x op.accepted_fd = accepted;
221 2x op.peer_storage = peer_storage;
222 2x op.complete(0, 0);
223 2x op.impl_ptr = shared_from_this();
224 2x svc_.post(&op);
225 2x return std::noop_coroutine();
226 }
227
228 3085x if (errno == EAGAIN || errno == EWOULDBLOCK)
229 {
230 3085x op.impl_ptr = shared_from_this();
231 3085x svc_.work_started();
232
233 3085x std::lock_guard lock(desc_state_.mutex);
234 3085x bool io_done = false;
235 3085x if (desc_state_.read_ready)
236 {
237 desc_state_.read_ready = false;
238 op.perform_io();
239 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
240 if (!io_done)
241 op.errn = 0;
242 }
243
244 3085x if (io_done || op.cancelled.load(std::memory_order_acquire))
245 {
246 svc_.post(&op);
247 svc_.work_finished();
248 }
249 else
250 {
251 3085x desc_state_.read_op = &op;
252 }
253 3085x return std::noop_coroutine();
254 3085x }
255
256 op.complete(errno, 0);
257 op.impl_ptr = shared_from_this();
258 svc_.post(&op);
259 return std::noop_coroutine();
260 }
261
262 inline void
263 2x select_acceptor::cancel() noexcept
264 {
265 2x do_cancel();
266 2x }
267
268 inline void
269 240x select_acceptor::close_socket() noexcept
270 {
271 240x do_close_socket();
272 240x }
273
274 168x inline select_acceptor_service::select_acceptor_service(
275 168x capy::execution_context& ctx)
276 168x : ctx_(ctx)
277 168x , state_(
278 std::make_unique<select_acceptor_state>(
279 168x ctx.use_service<select_scheduler>()))
280 {
281 168x }
282
283 336x inline select_acceptor_service::~select_acceptor_service() {}
284
285 inline void
286 168x select_acceptor_service::shutdown()
287 {
288 168x std::lock_guard lock(state_->mutex_);
289
290 168x while (auto* impl = state_->impl_list_.pop_front())
291 impl->close_socket();
292
293 // Don't clear impl_ptrs_ here — same rationale as
294 // select_socket_service::shutdown(). Let ~state_ release ptrs
295 // after scheduler shutdown has drained all queued ops.
296 168x }
297
298 inline io_object::implementation*
299 61x select_acceptor_service::construct()
300 {
301 61x auto impl = std::make_shared<select_acceptor>(*this);
302 61x auto* raw = impl.get();
303
304 61x std::lock_guard lock(state_->mutex_);
305 61x state_->impl_ptrs_.emplace(raw, std::move(impl));
306 61x state_->impl_list_.push_back(raw);
307
308 61x return raw;
309 61x }
310
311 inline void
312 61x select_acceptor_service::destroy(io_object::implementation* impl)
313 {
314 61x auto* select_impl = static_cast<select_acceptor*>(impl);
315 61x select_impl->close_socket();
316 61x std::lock_guard lock(state_->mutex_);
317 61x state_->impl_list_.remove(select_impl);
318 61x state_->impl_ptrs_.erase(select_impl);
319 61x }
320
321 inline void
322 120x select_acceptor_service::close(io_object::handle& h)
323 {
324 120x static_cast<select_acceptor*>(h.get())->close_socket();
325 120x }
326
327 inline std::error_code
328 59x select_acceptor_service::open_acceptor_socket(
329 tcp_acceptor::implementation& impl, int family, int type, int protocol)
330 {
331 59x auto* select_impl = static_cast<select_acceptor*>(&impl);
332 59x select_impl->close_socket();
333
334 59x int fd = ::socket(family, type, protocol);
335 59x if (fd < 0)
336 return make_err(errno);
337
338 59x int flags = ::fcntl(fd, F_GETFL, 0);
339 59x if (flags == -1)
340 {
341 int errn = errno;
342 ::close(fd);
343 return make_err(errn);
344 }
345 59x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
346 {
347 int errn = errno;
348 ::close(fd);
349 return make_err(errn);
350 }
351 59x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
352 {
353 int errn = errno;
354 ::close(fd);
355 return make_err(errn);
356 }
357
358 59x if (fd >= FD_SETSIZE)
359 {
360 ::close(fd);
361 return make_err(EMFILE);
362 }
363
364 59x if (family == AF_INET6)
365 {
366 8x int val = 0; // dual-stack default
367 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
368 }
369
370 #ifdef SO_NOSIGPIPE
371 {
372 int nosig = 1;
373 ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
374 }
375 #endif
376
377 59x select_impl->fd_ = fd;
378
379 // Set up descriptor state but do NOT register with reactor yet
380 // (registration happens in do_listen via reactor_acceptor base)
381 59x select_impl->desc_state_.fd = fd;
382 {
383 59x std::lock_guard lock(select_impl->desc_state_.mutex);
384 59x select_impl->desc_state_.read_op = nullptr;
385 59x }
386
387 59x return {};
388 }
389
390 inline std::error_code
391 58x select_acceptor_service::bind_acceptor(
392 tcp_acceptor::implementation& impl, endpoint ep)
393 {
394 58x return static_cast<select_acceptor*>(&impl)->do_bind(ep);
395 }
396
397 inline std::error_code
398 57x select_acceptor_service::listen_acceptor(
399 tcp_acceptor::implementation& impl, int backlog)
400 {
401 57x return static_cast<select_acceptor*>(&impl)->do_listen(backlog);
402 }
403
404 inline void
405 5x select_acceptor_service::post(scheduler_op* op)
406 {
407 5x state_->sched_.post(op);
408 5x }
409
410 inline void
411 3085x select_acceptor_service::work_started() noexcept
412 {
413 3085x state_->sched_.work_started();
414 3085x }
415
416 inline void
417 3x select_acceptor_service::work_finished() noexcept
418 {
419 3x state_->sched_.work_finished();
420 3x }
421
422 inline select_socket_service*
423 3084x select_acceptor_service::socket_service() const noexcept
424 {
425 3084x auto* svc = ctx_.find_service<detail::socket_service>();
426 3084x return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
427 }
428
429 } // namespace boost::corosio::detail
430
431 #endif // BOOST_COROSIO_HAS_SELECT
432
433 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
434