TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25 :
26 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27 :
28 : #include <memory>
29 : #include <mutex>
30 : #include <utility>
31 :
32 : #include <errno.h>
33 : #include <netinet/in.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : namespace boost::corosio::detail {
39 :
40 : /// State for epoll acceptor service.
41 : using epoll_acceptor_state =
42 : reactor_service_state<epoll_scheduler, epoll_acceptor>;
43 :
44 : /** epoll acceptor service implementation.
45 :
46 : Inherits from acceptor_service to enable runtime polymorphism.
47 : Uses key_type = acceptor_service for service lookup.
48 : */
49 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
50 : {
51 : public:
52 : explicit epoll_acceptor_service(capy::execution_context& ctx);
53 : ~epoll_acceptor_service() override;
54 :
55 : epoll_acceptor_service(epoll_acceptor_service const&) = delete;
56 : epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
57 :
58 : void shutdown() override;
59 :
60 : io_object::implementation* construct() override;
61 : void destroy(io_object::implementation*) override;
62 : void close(io_object::handle&) override;
63 : std::error_code open_acceptor_socket(
64 : tcp_acceptor::implementation& impl,
65 : int family,
66 : int type,
67 : int protocol) override;
68 : std::error_code
69 : bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
70 : std::error_code
71 : listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
72 :
73 HIT 152 : epoll_scheduler& scheduler() const noexcept
74 : {
75 152 : return state_->sched_;
76 : }
77 : void post(scheduler_op* op);
78 : void work_started() noexcept;
79 : void work_finished() noexcept;
80 :
81 : /** Get the socket service for creating peer sockets during accept. */
82 : epoll_socket_service* socket_service() const noexcept;
83 :
84 : private:
85 : capy::execution_context& ctx_;
86 : std::unique_ptr<epoll_acceptor_state> state_;
87 : };
88 :
89 : inline void
90 6 : epoll_accept_op::cancel() noexcept
91 : {
92 6 : if (acceptor_impl_)
93 6 : acceptor_impl_->cancel_single_op(*this);
94 : else
95 MIS 0 : request_cancel();
96 HIT 6 : }
97 :
98 : inline void
99 4406 : epoll_accept_op::operator()()
100 : {
101 4406 : complete_accept_op<epoll_socket>(*this);
102 4406 : }
103 :
104 80 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
105 80 : : reactor_acceptor(svc)
106 : {
107 80 : }
108 :
109 : inline std::coroutine_handle<>
110 4406 : epoll_acceptor::accept(
111 : std::coroutine_handle<> h,
112 : capy::executor_ref ex,
113 : std::stop_token token,
114 : std::error_code* ec,
115 : io_object::implementation** impl_out)
116 : {
117 4406 : auto& op = acc_;
118 4406 : op.reset();
119 4406 : op.h = h;
120 4406 : op.ex = ex;
121 4406 : op.ec_out = ec;
122 4406 : op.impl_out = impl_out;
123 4406 : op.fd = fd_;
124 4406 : op.start(token, this);
125 :
126 4406 : sockaddr_storage peer_storage{};
127 4406 : socklen_t addrlen = sizeof(peer_storage);
128 : int accepted;
129 : do
130 : {
131 4406 : accepted = ::accept4(
132 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
133 : SOCK_NONBLOCK | SOCK_CLOEXEC);
134 : }
135 4406 : while (accepted < 0 && errno == EINTR);
136 :
137 4406 : if (accepted >= 0)
138 : {
139 : {
140 2 : std::lock_guard lock(desc_state_.mutex);
141 2 : desc_state_.read_ready = false;
142 2 : }
143 :
144 2 : if (svc_.scheduler().try_consume_inline_budget())
145 : {
146 MIS 0 : auto* socket_svc = svc_.socket_service();
147 0 : if (socket_svc)
148 : {
149 : auto& impl =
150 0 : static_cast<epoll_socket&>(*socket_svc->construct());
151 0 : impl.set_socket(accepted);
152 :
153 0 : impl.desc_state_.fd = accepted;
154 : {
155 0 : std::lock_guard lock(impl.desc_state_.mutex);
156 0 : impl.desc_state_.read_op = nullptr;
157 0 : impl.desc_state_.write_op = nullptr;
158 0 : impl.desc_state_.connect_op = nullptr;
159 0 : }
160 0 : socket_svc->scheduler().register_descriptor(
161 : accepted, &impl.desc_state_);
162 :
163 0 : impl.set_endpoints(
164 : local_endpoint_, from_sockaddr(peer_storage));
165 :
166 0 : *ec = {};
167 0 : if (impl_out)
168 0 : *impl_out = &impl;
169 : }
170 : else
171 : {
172 0 : ::close(accepted);
173 0 : *ec = make_err(ENOENT);
174 0 : if (impl_out)
175 0 : *impl_out = nullptr;
176 : }
177 0 : return dispatch_coro(ex, h);
178 : }
179 :
180 HIT 2 : op.accepted_fd = accepted;
181 2 : op.peer_storage = peer_storage;
182 2 : op.complete(0, 0);
183 2 : op.impl_ptr = shared_from_this();
184 2 : svc_.post(&op);
185 2 : return std::noop_coroutine();
186 : }
187 :
188 4404 : if (errno == EAGAIN || errno == EWOULDBLOCK)
189 : {
190 4404 : op.impl_ptr = shared_from_this();
191 4404 : svc_.work_started();
192 :
193 4404 : std::lock_guard lock(desc_state_.mutex);
194 4404 : bool io_done = false;
195 4404 : if (desc_state_.read_ready)
196 : {
197 MIS 0 : desc_state_.read_ready = false;
198 0 : op.perform_io();
199 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
200 0 : if (!io_done)
201 0 : op.errn = 0;
202 : }
203 :
204 HIT 4404 : if (io_done || op.cancelled.load(std::memory_order_acquire))
205 : {
206 MIS 0 : svc_.post(&op);
207 0 : svc_.work_finished();
208 : }
209 : else
210 : {
211 HIT 4404 : desc_state_.read_op = &op;
212 : }
213 4404 : return std::noop_coroutine();
214 4404 : }
215 :
216 MIS 0 : op.complete(errno, 0);
217 0 : op.impl_ptr = shared_from_this();
218 0 : svc_.post(&op);
219 : // completion is always posted to scheduler queue, never inline.
220 0 : return std::noop_coroutine();
221 : }
222 :
223 : inline void
224 HIT 2 : epoll_acceptor::cancel() noexcept
225 : {
226 2 : do_cancel();
227 2 : }
228 :
229 : inline void
230 318 : epoll_acceptor::close_socket() noexcept
231 : {
232 318 : do_close_socket();
233 318 : }
234 :
235 244 : inline epoll_acceptor_service::epoll_acceptor_service(
236 244 : capy::execution_context& ctx)
237 244 : : ctx_(ctx)
238 244 : , state_(
239 : std::make_unique<epoll_acceptor_state>(
240 244 : ctx.use_service<epoll_scheduler>()))
241 : {
242 244 : }
243 :
244 488 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
245 :
246 : inline void
247 244 : epoll_acceptor_service::shutdown()
248 : {
249 244 : std::lock_guard lock(state_->mutex_);
250 :
251 244 : while (auto* impl = state_->impl_list_.pop_front())
252 MIS 0 : impl->close_socket();
253 :
254 : // Don't clear impl_ptrs_ here — same rationale as
255 : // epoll_socket_service::shutdown(). Let ~state_ release ptrs
256 : // after scheduler shutdown has drained all queued ops.
257 HIT 244 : }
258 :
259 : inline io_object::implementation*
260 80 : epoll_acceptor_service::construct()
261 : {
262 80 : auto impl = std::make_shared<epoll_acceptor>(*this);
263 80 : auto* raw = impl.get();
264 :
265 80 : std::lock_guard lock(state_->mutex_);
266 80 : state_->impl_ptrs_.emplace(raw, std::move(impl));
267 80 : state_->impl_list_.push_back(raw);
268 :
269 80 : return raw;
270 80 : }
271 :
272 : inline void
273 80 : epoll_acceptor_service::destroy(io_object::implementation* impl)
274 : {
275 80 : auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
276 80 : epoll_impl->close_socket();
277 80 : std::lock_guard lock(state_->mutex_);
278 80 : state_->impl_list_.remove(epoll_impl);
279 80 : state_->impl_ptrs_.erase(epoll_impl);
280 80 : }
281 :
282 : inline void
283 159 : epoll_acceptor_service::close(io_object::handle& h)
284 : {
285 159 : static_cast<epoll_acceptor*>(h.get())->close_socket();
286 159 : }
287 :
288 : inline std::error_code
289 79 : epoll_acceptor_service::open_acceptor_socket(
290 : tcp_acceptor::implementation& impl, int family, int type, int protocol)
291 : {
292 79 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
293 79 : epoll_impl->close_socket();
294 :
295 79 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
296 79 : if (fd < 0)
297 MIS 0 : return make_err(errno);
298 :
299 HIT 79 : if (family == AF_INET6)
300 : {
301 8 : int val = 0; // dual-stack default
302 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
303 : }
304 :
305 79 : epoll_impl->fd_ = fd;
306 :
307 : // Set up descriptor state but do NOT register with epoll yet
308 79 : epoll_impl->desc_state_.fd = fd;
309 : {
310 79 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
311 79 : epoll_impl->desc_state_.read_op = nullptr;
312 79 : }
313 :
314 79 : return {};
315 : }
316 :
317 : inline std::error_code
318 78 : epoll_acceptor_service::bind_acceptor(
319 : tcp_acceptor::implementation& impl, endpoint ep)
320 : {
321 78 : return static_cast<epoll_acceptor*>(&impl)->do_bind(ep);
322 : }
323 :
324 : inline std::error_code
325 75 : epoll_acceptor_service::listen_acceptor(
326 : tcp_acceptor::implementation& impl, int backlog)
327 : {
328 75 : return static_cast<epoll_acceptor*>(&impl)->do_listen(backlog);
329 : }
330 :
331 : inline void
332 11 : epoll_acceptor_service::post(scheduler_op* op)
333 : {
334 11 : state_->sched_.post(op);
335 11 : }
336 :
337 : inline void
338 4404 : epoll_acceptor_service::work_started() noexcept
339 : {
340 4404 : state_->sched_.work_started();
341 4404 : }
342 :
343 : inline void
344 9 : epoll_acceptor_service::work_finished() noexcept
345 : {
346 9 : state_->sched_.work_finished();
347 9 : }
348 :
349 : inline epoll_socket_service*
350 4397 : epoll_acceptor_service::socket_service() const noexcept
351 : {
352 4397 : auto* svc = ctx_.find_service<detail::socket_service>();
353 4397 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
354 : }
355 :
356 : } // namespace boost::corosio::detail
357 :
358 : #endif // BOOST_COROSIO_HAS_EPOLL
359 :
360 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
|