TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <sys/select.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /// State for select acceptor service.
42 : using select_acceptor_state =
43 : reactor_service_state<select_scheduler, select_acceptor>;
44 :
45 : /** select acceptor service implementation.
46 :
47 : Inherits from acceptor_service to enable runtime polymorphism.
48 : Uses key_type = acceptor_service for service lookup.
49 : */
50 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
51 : {
52 : public:
53 : explicit select_acceptor_service(capy::execution_context& ctx);
54 : ~select_acceptor_service() override;
55 :
56 : select_acceptor_service(select_acceptor_service const&) = delete;
57 : select_acceptor_service& operator=(select_acceptor_service const&) = delete;
58 :
59 : void shutdown() override;
60 :
61 : io_object::implementation* construct() override;
62 : void destroy(io_object::implementation*) override;
63 : void close(io_object::handle&) override;
64 : std::error_code open_acceptor_socket(
65 : tcp_acceptor::implementation& impl,
66 : int family,
67 : int type,
68 : int protocol) override;
69 : std::error_code
70 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
71 : std::error_code
72 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
73 :
74 HIT 116 : select_scheduler& scheduler() const noexcept
75 : {
76 116 : return state_->sched_;
77 : }
78 : void post(scheduler_op* op);
79 : void work_started() noexcept;
80 : void work_finished() noexcept;
81 :
82 : /** Get the socket service for creating peer sockets during accept. */
83 : select_socket_service* socket_service() const noexcept;
84 :
85 : private:
86 : capy::execution_context& ctx_;
87 : std::unique_ptr<select_acceptor_state> state_;
88 : };
89 :
90 : inline void
91 MIS 0 : select_accept_op::cancel() noexcept
92 : {
93 0 : if (acceptor_impl_)
94 0 : acceptor_impl_->cancel_single_op(*this);
95 : else
96 0 : request_cancel();
97 0 : }
98 :
99 : inline void
100 HIT 3087 : select_accept_op::operator()()
101 : {
102 3087 : complete_accept_op<select_socket>(*this);
103 3087 : }
104 :
105 61 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
106 61 : : reactor_acceptor(svc)
107 : {
108 61 : }
109 :
110 : inline std::coroutine_handle<>
111 3087 : select_acceptor::accept(
112 : std::coroutine_handle<> h,
113 : capy::executor_ref ex,
114 : std::stop_token token,
115 : std::error_code* ec,
116 : io_object::implementation** impl_out)
117 : {
118 3087 : auto& op = acc_;
119 3087 : op.reset();
120 3087 : op.h = h;
121 3087 : op.ex = ex;
122 3087 : op.ec_out = ec;
123 3087 : op.impl_out = impl_out;
124 3087 : op.fd = fd_;
125 3087 : op.start(token, this);
126 :
127 3087 : sockaddr_storage peer_storage{};
128 3087 : socklen_t addrlen = sizeof(peer_storage);
129 : int accepted;
130 : do
131 : {
132 : accepted =
133 3087 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
134 : }
135 3087 : while (accepted < 0 && errno == EINTR);
136 :
137 3087 : if (accepted >= 0)
138 : {
139 2 : if (accepted >= FD_SETSIZE)
140 : {
141 MIS 0 : ::close(accepted);
142 0 : op.complete(EINVAL, 0);
143 0 : op.impl_ptr = shared_from_this();
144 0 : svc_.post(&op);
145 0 : return std::noop_coroutine();
146 : }
147 :
148 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
149 2 : if (flags == -1)
150 : {
151 MIS 0 : int err = errno;
152 0 : ::close(accepted);
153 0 : op.complete(err, 0);
154 0 : op.impl_ptr = shared_from_this();
155 0 : svc_.post(&op);
156 0 : return std::noop_coroutine();
157 : }
158 :
159 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
160 : {
161 MIS 0 : int err = errno;
162 0 : ::close(accepted);
163 0 : op.complete(err, 0);
164 0 : op.impl_ptr = shared_from_this();
165 0 : svc_.post(&op);
166 0 : return std::noop_coroutine();
167 : }
168 :
169 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
170 : {
171 MIS 0 : int err = errno;
172 0 : ::close(accepted);
173 0 : op.complete(err, 0);
174 0 : op.impl_ptr = shared_from_this();
175 0 : svc_.post(&op);
176 0 : return std::noop_coroutine();
177 : }
178 :
179 : {
180 HIT 2 : std::lock_guard lock(desc_state_.mutex);
181 2 : desc_state_.read_ready = false;
182 2 : }
183 :
184 2 : if (svc_.scheduler().try_consume_inline_budget())
185 : {
186 MIS 0 : auto* socket_svc = svc_.socket_service();
187 0 : if (socket_svc)
188 : {
189 : auto& impl =
190 0 : static_cast<select_socket&>(*socket_svc->construct());
191 0 : impl.set_socket(accepted);
192 :
193 0 : impl.desc_state_.fd = accepted;
194 : {
195 0 : std::lock_guard lock(impl.desc_state_.mutex);
196 0 : impl.desc_state_.read_op = nullptr;
197 0 : impl.desc_state_.write_op = nullptr;
198 0 : impl.desc_state_.connect_op = nullptr;
199 0 : }
200 0 : socket_svc->scheduler().register_descriptor(
201 : accepted, &impl.desc_state_);
202 :
203 0 : impl.set_endpoints(
204 : local_endpoint_, from_sockaddr(peer_storage));
205 :
206 0 : *ec = {};
207 0 : if (impl_out)
208 0 : *impl_out = &impl;
209 : }
210 : else
211 : {
212 0 : ::close(accepted);
213 0 : *ec = make_err(ENOENT);
214 0 : if (impl_out)
215 0 : *impl_out = nullptr;
216 : }
217 0 : return dispatch_coro(ex, h);
218 : }
219 :
220 HIT 2 : op.accepted_fd = accepted;
221 2 : op.peer_storage = peer_storage;
222 2 : op.complete(0, 0);
223 2 : op.impl_ptr = shared_from_this();
224 2 : svc_.post(&op);
225 2 : return std::noop_coroutine();
226 : }
227 :
228 3085 : if (errno == EAGAIN || errno == EWOULDBLOCK)
229 : {
230 3085 : op.impl_ptr = shared_from_this();
231 3085 : svc_.work_started();
232 :
233 3085 : std::lock_guard lock(desc_state_.mutex);
234 3085 : bool io_done = false;
235 3085 : if (desc_state_.read_ready)
236 : {
237 MIS 0 : desc_state_.read_ready = false;
238 0 : op.perform_io();
239 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
240 0 : if (!io_done)
241 0 : op.errn = 0;
242 : }
243 :
244 HIT 3085 : if (io_done || op.cancelled.load(std::memory_order_acquire))
245 : {
246 MIS 0 : svc_.post(&op);
247 0 : svc_.work_finished();
248 : }
249 : else
250 : {
251 HIT 3085 : desc_state_.read_op = &op;
252 : }
253 3085 : return std::noop_coroutine();
254 3085 : }
255 :
256 MIS 0 : op.complete(errno, 0);
257 0 : op.impl_ptr = shared_from_this();
258 0 : svc_.post(&op);
259 0 : return std::noop_coroutine();
260 : }
261 :
262 : inline void
263 HIT 2 : select_acceptor::cancel() noexcept
264 : {
265 2 : do_cancel();
266 2 : }
267 :
268 : inline void
269 240 : select_acceptor::close_socket() noexcept
270 : {
271 240 : do_close_socket();
272 240 : }
273 :
274 168 : inline select_acceptor_service::select_acceptor_service(
275 168 : capy::execution_context& ctx)
276 168 : : ctx_(ctx)
277 168 : , state_(
278 : std::make_unique<select_acceptor_state>(
279 168 : ctx.use_service<select_scheduler>()))
280 : {
281 168 : }
282 :
283 336 : inline select_acceptor_service::~select_acceptor_service() {}
284 :
285 : inline void
286 168 : select_acceptor_service::shutdown()
287 : {
288 168 : std::lock_guard lock(state_->mutex_);
289 :
290 168 : while (auto* impl = state_->impl_list_.pop_front())
291 MIS 0 : impl->close_socket();
292 :
293 : // Don't clear impl_ptrs_ here — same rationale as
294 : // select_socket_service::shutdown(). Let ~state_ release ptrs
295 : // after scheduler shutdown has drained all queued ops.
296 HIT 168 : }
297 :
298 : inline io_object::implementation*
299 61 : select_acceptor_service::construct()
300 : {
301 61 : auto impl = std::make_shared<select_acceptor>(*this);
302 61 : auto* raw = impl.get();
303 :
304 61 : std::lock_guard lock(state_->mutex_);
305 61 : state_->impl_ptrs_.emplace(raw, std::move(impl));
306 61 : state_->impl_list_.push_back(raw);
307 :
308 61 : return raw;
309 61 : }
310 :
311 : inline void
312 61 : select_acceptor_service::destroy(io_object::implementation* impl)
313 : {
314 61 : auto* select_impl = static_cast<select_acceptor*>(impl);
315 61 : select_impl->close_socket();
316 61 : std::lock_guard lock(state_->mutex_);
317 61 : state_->impl_list_.remove(select_impl);
318 61 : state_->impl_ptrs_.erase(select_impl);
319 61 : }
320 :
321 : inline void
322 120 : select_acceptor_service::close(io_object::handle& h)
323 : {
324 120 : static_cast<select_acceptor*>(h.get())->close_socket();
325 120 : }
326 :
327 : inline std::error_code
328 59 : select_acceptor_service::open_acceptor_socket(
329 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
330 : {
331 59 : auto* select_impl = static_cast<select_acceptor*>(&impl);
332 59 : select_impl->close_socket();
333 :
334 59 : int fd = ::socket(family, type, protocol);
335 59 : if (fd < 0)
336 MIS 0 : return make_err(errno);
337 :
338 HIT 59 : int flags = ::fcntl(fd, F_GETFL, 0);
339 59 : if (flags == -1)
340 : {
341 MIS 0 : int errn = errno;
342 0 : ::close(fd);
343 0 : return make_err(errn);
344 : }
345 HIT 59 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
346 : {
347 MIS 0 : int errn = errno;
348 0 : ::close(fd);
349 0 : return make_err(errn);
350 : }
351 HIT 59 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
352 : {
353 MIS 0 : int errn = errno;
354 0 : ::close(fd);
355 0 : return make_err(errn);
356 : }
357 :
358 HIT 59 : if (fd >= FD_SETSIZE)
359 : {
360 MIS 0 : ::close(fd);
361 0 : return make_err(EMFILE);
362 : }
363 :
364 HIT 59 : if (family == AF_INET6)
365 : {
366 8 : int val = 0; // dual-stack default
367 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
368 : }
369 :
370 : #ifdef SO_NOSIGPIPE
371 : {
372 : int nosig = 1;
373 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &nosig, sizeof(nosig));
374 : }
375 : #endif
376 :
377 59 : select_impl->fd_ = fd;
378 :
379 : // Set up descriptor state but do NOT register with reactor yet
380 : // (registration happens in do_listen via reactor_acceptor base)
381 59 : select_impl->desc_state_.fd = fd;
382 : {
383 59 : std::lock_guard lock(select_impl->desc_state_.mutex);
384 59 : select_impl->desc_state_.read_op = nullptr;
385 59 : }
386 :
387 59 : return {};
388 : }
389 :
390 : inline std::error_code
391 58 : select_acceptor_service::bind_acceptor(
392 : tcp_acceptor::implementation& impl, endpoint ep)
393 : {
394 58 : return static_cast<select_acceptor*>(&impl)->do_bind(ep);
395 : }
396 :
397 : inline std::error_code
398 57 : select_acceptor_service::listen_acceptor(
399 : tcp_acceptor::implementation& impl, int backlog)
400 : {
401 57 : return static_cast<select_acceptor*>(&impl)->do_listen(backlog);
402 : }
403 :
404 : inline void
405 5 : select_acceptor_service::post(scheduler_op* op)
406 : {
407 5 : state_->sched_.post(op);
408 5 : }
409 :
410 : inline void
411 3085 : select_acceptor_service::work_started() noexcept
412 : {
413 3085 : state_->sched_.work_started();
414 3085 : }
415 :
416 : inline void
417 3 : select_acceptor_service::work_finished() noexcept
418 : {
419 3 : state_->sched_.work_finished();
420 3 : }
421 :
422 : inline select_socket_service*
423 3084 : select_acceptor_service::socket_service() const noexcept
424 : {
425 3084 : auto* svc = ctx_.find_service<detail::socket_service>();
426 3084 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
427 : }
428 :
429 : } // namespace boost::corosio::detail
430 :
431 : #endif // BOOST_COROSIO_HAS_SELECT
432 :
433 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
|