include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp

76.6% Lines (111/145) 100.0% List of functions (20/20)
f(x) Functions (20)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_acceptor_service::scheduler() const :73 0 100.0% boost::corosio::detail::epoll_accept_op::cancel() :90 0 80.0% boost::corosio::detail::epoll_accept_op::operator()() :99 0 100.0% boost::corosio::detail::epoll_acceptor::epoll_acceptor(boost::corosio::detail::epoll_acceptor_service&) :104 0 100.0% boost::corosio::detail::epoll_acceptor::accept(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, std::stop_token, std::error_code*, boost::corosio::io_object::implementation**) :110 0 52.3% boost::corosio::detail::epoll_acceptor::cancel() :224 0 100.0% boost::corosio::detail::epoll_acceptor::close_socket() :230 0 100.0% boost::corosio::detail::epoll_acceptor_service::epoll_acceptor_service(boost::capy::execution_context&) :235 0 100.0% boost::corosio::detail::epoll_acceptor_service::~epoll_acceptor_service() :244 0 100.0% boost::corosio::detail::epoll_acceptor_service::shutdown() :247 0 80.0% boost::corosio::detail::epoll_acceptor_service::construct() :260 0 100.0% boost::corosio::detail::epoll_acceptor_service::destroy(boost::corosio::io_object::implementation*) :273 0 100.0% boost::corosio::detail::epoll_acceptor_service::close(boost::corosio::io_object::handle&) :283 0 100.0% boost::corosio::detail::epoll_acceptor_service::open_acceptor_socket(boost::corosio::tcp_acceptor::implementation&, int, int, int) :289 0 93.3% boost::corosio::detail::epoll_acceptor_service::bind_acceptor(boost::corosio::tcp_acceptor::implementation&, boost::corosio::endpoint) :318 0 100.0% boost::corosio::detail::epoll_acceptor_service::listen_acceptor(boost::corosio::tcp_acceptor::implementation&, int) :325 0 100.0% boost::corosio::detail::epoll_acceptor_service::post(boost::corosio::detail::scheduler_op*) :332 0 100.0% boost::corosio::detail::epoll_acceptor_service::work_started() :338 0 100.0% boost::corosio::detail::epoll_acceptor_service::work_finished() :344 0 100.0% boost::corosio::detail::epoll_acceptor_service::socket_service() const :350 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
25
26 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
27
28 #include <memory>
29 #include <mutex>
30 #include <utility>
31
32 #include <errno.h>
33 #include <netinet/in.h>
34 #include <sys/epoll.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 namespace boost::corosio::detail {
39
40 /// State for epoll acceptor service.
41 using epoll_acceptor_state =
42 reactor_service_state<epoll_scheduler, epoll_acceptor>;
43
44 /** epoll acceptor service implementation.
45
46 Inherits from acceptor_service to enable runtime polymorphism.
47 Uses key_type = acceptor_service for service lookup.
48 */
49 class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
50 {
51 public:
52 explicit epoll_acceptor_service(capy::execution_context& ctx);
53 ~epoll_acceptor_service() override;
54
55 epoll_acceptor_service(epoll_acceptor_service const&) = delete;
56 epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
57
58 void shutdown() override;
59
60 io_object::implementation* construct() override;
61 void destroy(io_object::implementation*) override;
62 void close(io_object::handle&) override;
63 std::error_code open_acceptor_socket(
64 tcp_acceptor::implementation& impl,
65 int family,
66 int type,
67 int protocol) override;
68 std::error_code
69 bind_acceptor(tcp_acceptor::implementation& impl, endpoint ep) override;
70 std::error_code
71 listen_acceptor(tcp_acceptor::implementation& impl, int backlog) override;
72
73 152x epoll_scheduler& scheduler() const noexcept
74 {
75 152x return state_->sched_;
76 }
77 void post(scheduler_op* op);
78 void work_started() noexcept;
79 void work_finished() noexcept;
80
81 /** Get the socket service for creating peer sockets during accept. */
82 epoll_socket_service* socket_service() const noexcept;
83
84 private:
85 capy::execution_context& ctx_;
86 std::unique_ptr<epoll_acceptor_state> state_;
87 };
88
89 inline void
90 6x epoll_accept_op::cancel() noexcept
91 {
92 6x if (acceptor_impl_)
93 6x acceptor_impl_->cancel_single_op(*this);
94 else
95 request_cancel();
96 6x }
97
98 inline void
99 4406x epoll_accept_op::operator()()
100 {
101 4406x complete_accept_op<epoll_socket>(*this);
102 4406x }
103
104 80x inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
105 80x : reactor_acceptor(svc)
106 {
107 80x }
108
109 inline std::coroutine_handle<>
110 4406x epoll_acceptor::accept(
111 std::coroutine_handle<> h,
112 capy::executor_ref ex,
113 std::stop_token token,
114 std::error_code* ec,
115 io_object::implementation** impl_out)
116 {
117 4406x auto& op = acc_;
118 4406x op.reset();
119 4406x op.h = h;
120 4406x op.ex = ex;
121 4406x op.ec_out = ec;
122 4406x op.impl_out = impl_out;
123 4406x op.fd = fd_;
124 4406x op.start(token, this);
125
126 4406x sockaddr_storage peer_storage{};
127 4406x socklen_t addrlen = sizeof(peer_storage);
128 int accepted;
129 do
130 {
131 4406x accepted = ::accept4(
132 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
133 SOCK_NONBLOCK | SOCK_CLOEXEC);
134 }
135 4406x while (accepted < 0 && errno == EINTR);
136
137 4406x if (accepted >= 0)
138 {
139 {
140 2x std::lock_guard lock(desc_state_.mutex);
141 2x desc_state_.read_ready = false;
142 2x }
143
144 2x if (svc_.scheduler().try_consume_inline_budget())
145 {
146 auto* socket_svc = svc_.socket_service();
147 if (socket_svc)
148 {
149 auto& impl =
150 static_cast<epoll_socket&>(*socket_svc->construct());
151 impl.set_socket(accepted);
152
153 impl.desc_state_.fd = accepted;
154 {
155 std::lock_guard lock(impl.desc_state_.mutex);
156 impl.desc_state_.read_op = nullptr;
157 impl.desc_state_.write_op = nullptr;
158 impl.desc_state_.connect_op = nullptr;
159 }
160 socket_svc->scheduler().register_descriptor(
161 accepted, &impl.desc_state_);
162
163 impl.set_endpoints(
164 local_endpoint_, from_sockaddr(peer_storage));
165
166 *ec = {};
167 if (impl_out)
168 *impl_out = &impl;
169 }
170 else
171 {
172 ::close(accepted);
173 *ec = make_err(ENOENT);
174 if (impl_out)
175 *impl_out = nullptr;
176 }
177 return dispatch_coro(ex, h);
178 }
179
180 2x op.accepted_fd = accepted;
181 2x op.peer_storage = peer_storage;
182 2x op.complete(0, 0);
183 2x op.impl_ptr = shared_from_this();
184 2x svc_.post(&op);
185 2x return std::noop_coroutine();
186 }
187
188 4404x if (errno == EAGAIN || errno == EWOULDBLOCK)
189 {
190 4404x op.impl_ptr = shared_from_this();
191 4404x svc_.work_started();
192
193 4404x std::lock_guard lock(desc_state_.mutex);
194 4404x bool io_done = false;
195 4404x if (desc_state_.read_ready)
196 {
197 desc_state_.read_ready = false;
198 op.perform_io();
199 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
200 if (!io_done)
201 op.errn = 0;
202 }
203
204 4404x if (io_done || op.cancelled.load(std::memory_order_acquire))
205 {
206 svc_.post(&op);
207 svc_.work_finished();
208 }
209 else
210 {
211 4404x desc_state_.read_op = &op;
212 }
213 4404x return std::noop_coroutine();
214 4404x }
215
216 op.complete(errno, 0);
217 op.impl_ptr = shared_from_this();
218 svc_.post(&op);
219 // completion is always posted to scheduler queue, never inline.
220 return std::noop_coroutine();
221 }
222
223 inline void
224 2x epoll_acceptor::cancel() noexcept
225 {
226 2x do_cancel();
227 2x }
228
229 inline void
230 318x epoll_acceptor::close_socket() noexcept
231 {
232 318x do_close_socket();
233 318x }
234
235 244x inline epoll_acceptor_service::epoll_acceptor_service(
236 244x capy::execution_context& ctx)
237 244x : ctx_(ctx)
238 244x , state_(
239 std::make_unique<epoll_acceptor_state>(
240 244x ctx.use_service<epoll_scheduler>()))
241 {
242 244x }
243
244 488x inline epoll_acceptor_service::~epoll_acceptor_service() {}
245
246 inline void
247 244x epoll_acceptor_service::shutdown()
248 {
249 244x std::lock_guard lock(state_->mutex_);
250
251 244x while (auto* impl = state_->impl_list_.pop_front())
252 impl->close_socket();
253
254 // Don't clear impl_ptrs_ here — same rationale as
255 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
256 // after scheduler shutdown has drained all queued ops.
257 244x }
258
259 inline io_object::implementation*
260 80x epoll_acceptor_service::construct()
261 {
262 80x auto impl = std::make_shared<epoll_acceptor>(*this);
263 80x auto* raw = impl.get();
264
265 80x std::lock_guard lock(state_->mutex_);
266 80x state_->impl_ptrs_.emplace(raw, std::move(impl));
267 80x state_->impl_list_.push_back(raw);
268
269 80x return raw;
270 80x }
271
272 inline void
273 80x epoll_acceptor_service::destroy(io_object::implementation* impl)
274 {
275 80x auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
276 80x epoll_impl->close_socket();
277 80x std::lock_guard lock(state_->mutex_);
278 80x state_->impl_list_.remove(epoll_impl);
279 80x state_->impl_ptrs_.erase(epoll_impl);
280 80x }
281
282 inline void
283 159x epoll_acceptor_service::close(io_object::handle& h)
284 {
285 159x static_cast<epoll_acceptor*>(h.get())->close_socket();
286 159x }
287
288 inline std::error_code
289 79x epoll_acceptor_service::open_acceptor_socket(
290 tcp_acceptor::implementation& impl, int family, int type, int protocol)
291 {
292 79x auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
293 79x epoll_impl->close_socket();
294
295 79x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
296 79x if (fd < 0)
297 return make_err(errno);
298
299 79x if (family == AF_INET6)
300 {
301 8x int val = 0; // dual-stack default
302 8x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
303 }
304
305 79x epoll_impl->fd_ = fd;
306
307 // Set up descriptor state but do NOT register with epoll yet
308 79x epoll_impl->desc_state_.fd = fd;
309 {
310 79x std::lock_guard lock(epoll_impl->desc_state_.mutex);
311 79x epoll_impl->desc_state_.read_op = nullptr;
312 79x }
313
314 79x return {};
315 }
316
317 inline std::error_code
318 78x epoll_acceptor_service::bind_acceptor(
319 tcp_acceptor::implementation& impl, endpoint ep)
320 {
321 78x return static_cast<epoll_acceptor*>(&impl)->do_bind(ep);
322 }
323
324 inline std::error_code
325 75x epoll_acceptor_service::listen_acceptor(
326 tcp_acceptor::implementation& impl, int backlog)
327 {
328 75x return static_cast<epoll_acceptor*>(&impl)->do_listen(backlog);
329 }
330
331 inline void
332 11x epoll_acceptor_service::post(scheduler_op* op)
333 {
334 11x state_->sched_.post(op);
335 11x }
336
337 inline void
338 4404x epoll_acceptor_service::work_started() noexcept
339 {
340 4404x state_->sched_.work_started();
341 4404x }
342
343 inline void
344 9x epoll_acceptor_service::work_finished() noexcept
345 {
346 9x state_->sched_.work_finished();
347 9x }
348
349 inline epoll_socket_service*
350 4397x epoll_acceptor_service::socket_service() const noexcept
351 {
352 4397x auto* svc = ctx_.find_service<detail::socket_service>();
353 4397x return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
354 }
355
356 } // namespace boost::corosio::detail
357
358 #endif // BOOST_COROSIO_HAS_EPOLL
359
360 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
361