include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

86.3% Lines (82/95) 91.3% List of functions (21/23)
f(x) Functions (23)
Function Calls Lines Branches Blocks
boost::corosio::detail::epoll_socket_service::scheduler() const :117 0 100.0% boost::corosio::detail::epoll_connect_op::cancel() :130 0 0.0% boost::corosio::detail::epoll_read_op::cancel() :139 0 80.0% boost::corosio::detail::epoll_write_op::cancel() :148 0 0.0% boost::corosio::detail::epoll_op::operator()() :157 0 100.0% boost::corosio::detail::epoll_connect_op::operator()() :163 0 100.0% boost::corosio::detail::epoll_socket::epoll_socket(boost::corosio::detail::epoll_socket_service&) :168 0 100.0% boost::corosio::detail::epoll_socket::~epoll_socket() :173 0 100.0% boost::corosio::detail::epoll_socket::connect(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::endpoint, std::stop_token, std::error_code*) :176 0 100.0% boost::corosio::detail::epoll_socket::read_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :187 0 100.0% boost::corosio::detail::epoll_socket::write_some(std::__n4861::coroutine_handle<void>, boost::capy::executor_ref, boost::corosio::buffer_param, std::stop_token, std::error_code*, unsigned long*) :199 0 100.0% boost::corosio::detail::epoll_socket::cancel() :211 0 100.0% boost::corosio::detail::epoll_socket::close_socket() :217 0 100.0% boost::corosio::detail::epoll_socket_service::epoll_socket_service(boost::capy::execution_context&) :222 0 100.0% boost::corosio::detail::epoll_socket_service::~epoll_socket_service() :229 0 100.0% boost::corosio::detail::epoll_socket_service::shutdown() :232 0 80.0% boost::corosio::detail::epoll_socket_service::construct() :249 0 100.0% boost::corosio::detail::epoll_socket_service::destroy(boost::corosio::io_object::implementation*) :264 0 100.0% boost::corosio::detail::epoll_socket_service::open_socket(boost::corosio::tcp_socket::implementation&, int, int, int) :274 0 94.4% boost::corosio::detail::epoll_socket_service::close(boost::corosio::io_object::handle&) :306 0 100.0% boost::corosio::detail::epoll_socket_service::post(boost::corosio::detail::scheduler_op*) :312 0 100.0% boost::corosio::detail::epoll_socket_service::work_started() :318 0 100.0% boost::corosio::detail::epoll_socket_service::work_finished() :324 0 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
24
25 #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26
27 #include <coroutine>
28 #include <mutex>
29 #include <utility>
30
31 #include <errno.h>
32 #include <netinet/in.h>
33 #include <netinet/tcp.h>
34 #include <sys/epoll.h>
35 #include <sys/socket.h>
36 #include <unistd.h>
37
38 /*
39 epoll Socket Implementation
40 ===========================
41
42 Each I/O operation follows the same pattern:
43 1. Try the syscall immediately (non-blocking socket)
44 2. If it succeeds or fails with a real error, post to completion queue
45 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
46
47 This "try first" approach avoids unnecessary epoll round-trips for
48 operations that can complete immediately (common for small reads/writes
49 on fast local connections).
50
51 One-Shot Registration
52 ---------------------
53 We use one-shot epoll registration: each operation registers, waits for
54 one event, then unregisters. This simplifies the state machine since we
55 don't need to track whether an fd is currently registered or handle
56 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
57 simplicity is worth it.
58
59 Cancellation
60 ------------
61 See op.hpp for the completion/cancellation race handling via the
62 `registered` atomic. cancel() must complete pending operations (post
63 them with cancelled flag) so coroutines waiting on them can resume.
64 close_socket() calls cancel() first to ensure this.
65
66 Impl Lifetime with shared_ptr
67 -----------------------------
68 Socket impls use enable_shared_from_this. The service owns impls via
69 shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
70 removal. When a user calls close(), we call cancel() which posts pending
71 ops to the scheduler.
72
73 CRITICAL: The posted ops must keep the impl alive until they complete.
74 Otherwise the scheduler would process a freed op (use-after-free). The
75 cancel() method captures shared_from_this() into op.impl_ptr before
76 posting. When the op completes, impl_ptr is cleared, allowing the impl
77 to be destroyed if no other references exist.
78
79 Service Ownership
80 -----------------
81 epoll_socket_service owns all socket impls. destroy_impl() removes the
82 shared_ptr from the map, but the impl may survive if ops still hold
83 impl_ptr refs. shutdown() closes all sockets and clears the map; any
84 in-flight ops will complete and release their refs.
85 */
86
87 namespace boost::corosio::detail {
88
89 /// State for epoll socket service.
90 using epoll_socket_state = reactor_service_state<epoll_scheduler, epoll_socket>;
91
92 /** epoll socket service implementation.
93
94 Inherits from socket_service to enable runtime polymorphism.
95 Uses key_type = socket_service for service lookup.
96 */
97 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
98 {
99 public:
100 explicit epoll_socket_service(capy::execution_context& ctx);
101 ~epoll_socket_service() override;
102
103 epoll_socket_service(epoll_socket_service const&) = delete;
104 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
105
106 void shutdown() override;
107
108 io_object::implementation* construct() override;
109 void destroy(io_object::implementation*) override;
110 void close(io_object::handle&) override;
111 std::error_code open_socket(
112 tcp_socket::implementation& impl,
113 int family,
114 int type,
115 int protocol) override;
116
117 272069x epoll_scheduler& scheduler() const noexcept
118 {
119 272069x return state_->sched_;
120 }
121 void post(scheduler_op* op);
122 void work_started() noexcept;
123 void work_finished() noexcept;
124
125 private:
126 std::unique_ptr<epoll_socket_state> state_;
127 };
128
129 inline void
130 epoll_connect_op::cancel() noexcept
131 {
132 if (socket_impl_)
133 socket_impl_->cancel_single_op(*this);
134 else
135 request_cancel();
136 }
137
138 inline void
139 99x epoll_read_op::cancel() noexcept
140 {
141 99x if (socket_impl_)
142 99x socket_impl_->cancel_single_op(*this);
143 else
144 request_cancel();
145 99x }
146
147 inline void
148 epoll_write_op::cancel() noexcept
149 {
150 if (socket_impl_)
151 socket_impl_->cancel_single_op(*this);
152 else
153 request_cancel();
154 }
155
156 inline void
157 51065x epoll_op::operator()()
158 {
159 51065x complete_io_op(*this);
160 51065x }
161
162 inline void
163 4399x epoll_connect_op::operator()()
164 {
165 4399x complete_connect_op(*this);
166 4399x }
167
168 13254x inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
169 13254x : reactor_socket(svc)
170 {
171 13254x }
172
173 13254x inline epoll_socket::~epoll_socket() = default;
174
175 inline std::coroutine_handle<>
176 4399x epoll_socket::connect(
177 std::coroutine_handle<> h,
178 capy::executor_ref ex,
179 endpoint ep,
180 std::stop_token token,
181 std::error_code* ec)
182 {
183 4399x return do_connect(h, ex, ep, token, ec);
184 }
185
186 inline std::coroutine_handle<>
187 127402x epoll_socket::read_some(
188 std::coroutine_handle<> h,
189 capy::executor_ref ex,
190 buffer_param param,
191 std::stop_token token,
192 std::error_code* ec,
193 std::size_t* bytes_out)
194 {
195 127402x return do_read_some(h, ex, param, token, ec, bytes_out);
196 }
197
198 inline std::coroutine_handle<>
199 127249x epoll_socket::write_some(
200 std::coroutine_handle<> h,
201 capy::executor_ref ex,
202 buffer_param param,
203 std::stop_token token,
204 std::error_code* ec,
205 std::size_t* bytes_out)
206 {
207 127249x return do_write_some(h, ex, param, token, ec, bytes_out);
208 }
209
210 inline void
211 96x epoll_socket::cancel() noexcept
212 {
213 96x do_cancel();
214 96x }
215
216 inline void
217 39733x epoll_socket::close_socket() noexcept
218 {
219 39733x do_close_socket();
220 39733x }
221
222 244x inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
223 244x : state_(
224 std::make_unique<epoll_socket_state>(
225 244x ctx.use_service<epoll_scheduler>()))
226 {
227 244x }
228
229 488x inline epoll_socket_service::~epoll_socket_service() {}
230
231 inline void
232 244x epoll_socket_service::shutdown()
233 {
234 244x std::lock_guard lock(state_->mutex_);
235
236 244x while (auto* impl = state_->impl_list_.pop_front())
237 impl->close_socket();
238
239 // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
240 // drains completed_ops_, calling destroy() on each queued op. If we
241 // released our shared_ptrs now, an epoll_op::destroy() could free the
242 // last ref to an impl whose embedded descriptor_state is still linked
243 // in the queue — use-after-free on the next pop(). Letting ~state_
244 // release the ptrs (during service destruction, after scheduler
245 // shutdown) keeps every impl alive until all ops have been drained.
246 244x }
247
248 inline io_object::implementation*
249 13254x epoll_socket_service::construct()
250 {
251 13254x auto impl = std::make_shared<epoll_socket>(*this);
252 13254x auto* raw = impl.get();
253
254 {
255 13254x std::lock_guard lock(state_->mutex_);
256 13254x state_->impl_ptrs_.emplace(raw, std::move(impl));
257 13254x state_->impl_list_.push_back(raw);
258 13254x }
259
260 13254x return raw;
261 13254x }
262
263 inline void
264 13254x epoll_socket_service::destroy(io_object::implementation* impl)
265 {
266 13254x auto* epoll_impl = static_cast<epoll_socket*>(impl);
267 13254x epoll_impl->close_socket();
268 13254x std::lock_guard lock(state_->mutex_);
269 13254x state_->impl_list_.remove(epoll_impl);
270 13254x state_->impl_ptrs_.erase(epoll_impl);
271 13254x }
272
273 inline std::error_code
274 4414x epoll_socket_service::open_socket(
275 tcp_socket::implementation& impl, int family, int type, int protocol)
276 {
277 4414x auto* epoll_impl = static_cast<epoll_socket*>(&impl);
278 4414x epoll_impl->close_socket();
279
280 4414x int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
281 4414x if (fd < 0)
282 return make_err(errno);
283
284 4414x if (family == AF_INET6)
285 {
286 5x int one = 1;
287 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
288 }
289
290 4414x epoll_impl->fd_ = fd;
291
292 // Register fd with epoll (edge-triggered mode)
293 4414x epoll_impl->desc_state_.fd = fd;
294 {
295 4414x std::lock_guard lock(epoll_impl->desc_state_.mutex);
296 4414x epoll_impl->desc_state_.read_op = nullptr;
297 4414x epoll_impl->desc_state_.write_op = nullptr;
298 4414x epoll_impl->desc_state_.connect_op = nullptr;
299 4414x }
300 4414x scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
301
302 4414x return {};
303 }
304
305 inline void
306 22065x epoll_socket_service::close(io_object::handle& h)
307 {
308 22065x static_cast<epoll_socket*>(h.get())->close_socket();
309 22065x }
310
311 inline void
312 51013x epoll_socket_service::post(scheduler_op* op)
313 {
314 51013x state_->sched_.post(op);
315 51013x }
316
317 inline void
318 4601x epoll_socket_service::work_started() noexcept
319 {
320 4601x state_->sched_.work_started();
321 4601x }
322
323 inline void
324 150x epoll_socket_service::work_finished() noexcept
325 {
326 150x state_->sched_.work_finished();
327 150x }
328
329 } // namespace boost::corosio::detail
330
331 #endif // BOOST_COROSIO_HAS_EPOLL
332
333 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
334