TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
24 :
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <coroutine>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <netinet/in.h>
33 : #include <netinet/tcp.h>
34 : #include <sys/epoll.h>
35 : #include <sys/socket.h>
36 : #include <unistd.h>
37 :
38 : /*
39 : epoll Socket Implementation
40 : ===========================
41 :
42 : Each I/O operation follows the same pattern:
43 : 1. Try the syscall immediately (non-blocking socket)
44 : 2. If it succeeds or fails with a real error, post to completion queue
45 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
46 :
47 : This "try first" approach avoids unnecessary epoll round-trips for
48 : operations that can complete immediately (common for small reads/writes
49 : on fast local connections).
50 :
51 : One-Shot Registration
52 : ---------------------
53 : We use one-shot epoll registration: each operation registers, waits for
54 : one event, then unregisters. This simplifies the state machine since we
55 : don't need to track whether an fd is currently registered or handle
56 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
57 : simplicity is worth it.
58 :
59 : Cancellation
60 : ------------
61 : See op.hpp for the completion/cancellation race handling via the
62 : `registered` atomic. cancel() must complete pending operations (post
63 : them with cancelled flag) so coroutines waiting on them can resume.
64 : close_socket() calls cancel() first to ensure this.
65 :
66 : Impl Lifetime with shared_ptr
67 : -----------------------------
68 : Socket impls use enable_shared_from_this. The service owns impls via
69 : shared_ptr maps (impl_ptrs_) keyed by raw pointer for O(1) lookup and
70 : removal. When a user calls close(), we call cancel() which posts pending
71 : ops to the scheduler.
72 :
73 : CRITICAL: The posted ops must keep the impl alive until they complete.
74 : Otherwise the scheduler would process a freed op (use-after-free). The
75 : cancel() method captures shared_from_this() into op.impl_ptr before
76 : posting. When the op completes, impl_ptr is cleared, allowing the impl
77 : to be destroyed if no other references exist.
78 :
79 : Service Ownership
80 : -----------------
81 : epoll_socket_service owns all socket impls. destroy_impl() removes the
82 : shared_ptr from the map, but the impl may survive if ops still hold
83 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
84 : in-flight ops will complete and release their refs.
85 : */
86 :
87 : namespace boost::corosio::detail {
88 :
89 : /// State for epoll socket service.
90 : using epoll_socket_state = reactor_service_state<epoll_scheduler, epoll_socket>;
91 :
92 : /** epoll socket service implementation.
93 :
94 : Inherits from socket_service to enable runtime polymorphism.
95 : Uses key_type = socket_service for service lookup.
96 : */
97 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
98 : {
99 : public:
100 : explicit epoll_socket_service(capy::execution_context& ctx);
101 : ~epoll_socket_service() override;
102 :
103 : epoll_socket_service(epoll_socket_service const&) = delete;
104 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
105 :
106 : void shutdown() override;
107 :
108 : io_object::implementation* construct() override;
109 : void destroy(io_object::implementation*) override;
110 : void close(io_object::handle&) override;
111 : std::error_code open_socket(
112 : tcp_socket::implementation& impl,
113 : int family,
114 : int type,
115 : int protocol) override;
116 :
117 HIT 272069 : epoll_scheduler& scheduler() const noexcept
118 : {
119 272069 : return state_->sched_;
120 : }
121 : void post(scheduler_op* op);
122 : void work_started() noexcept;
123 : void work_finished() noexcept;
124 :
125 : private:
126 : std::unique_ptr<epoll_socket_state> state_;
127 : };
128 :
129 : inline void
130 MIS 0 : epoll_connect_op::cancel() noexcept
131 : {
132 0 : if (socket_impl_)
133 0 : socket_impl_->cancel_single_op(*this);
134 : else
135 0 : request_cancel();
136 0 : }
137 :
138 : inline void
139 HIT 99 : epoll_read_op::cancel() noexcept
140 : {
141 99 : if (socket_impl_)
142 99 : socket_impl_->cancel_single_op(*this);
143 : else
144 MIS 0 : request_cancel();
145 HIT 99 : }
146 :
147 : inline void
148 MIS 0 : epoll_write_op::cancel() noexcept
149 : {
150 0 : if (socket_impl_)
151 0 : socket_impl_->cancel_single_op(*this);
152 : else
153 0 : request_cancel();
154 0 : }
155 :
156 : inline void
157 HIT 51065 : epoll_op::operator()()
158 : {
159 51065 : complete_io_op(*this);
160 51065 : }
161 :
162 : inline void
163 4399 : epoll_connect_op::operator()()
164 : {
165 4399 : complete_connect_op(*this);
166 4399 : }
167 :
168 13254 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
169 13254 : : reactor_socket(svc)
170 : {
171 13254 : }
172 :
173 13254 : inline epoll_socket::~epoll_socket() = default;
174 :
175 : inline std::coroutine_handle<>
176 4399 : epoll_socket::connect(
177 : std::coroutine_handle<> h,
178 : capy::executor_ref ex,
179 : endpoint ep,
180 : std::stop_token token,
181 : std::error_code* ec)
182 : {
183 4399 : return do_connect(h, ex, ep, token, ec);
184 : }
185 :
186 : inline std::coroutine_handle<>
187 127402 : epoll_socket::read_some(
188 : std::coroutine_handle<> h,
189 : capy::executor_ref ex,
190 : buffer_param param,
191 : std::stop_token token,
192 : std::error_code* ec,
193 : std::size_t* bytes_out)
194 : {
195 127402 : return do_read_some(h, ex, param, token, ec, bytes_out);
196 : }
197 :
198 : inline std::coroutine_handle<>
199 127249 : epoll_socket::write_some(
200 : std::coroutine_handle<> h,
201 : capy::executor_ref ex,
202 : buffer_param param,
203 : std::stop_token token,
204 : std::error_code* ec,
205 : std::size_t* bytes_out)
206 : {
207 127249 : return do_write_some(h, ex, param, token, ec, bytes_out);
208 : }
209 :
210 : inline void
211 96 : epoll_socket::cancel() noexcept
212 : {
213 96 : do_cancel();
214 96 : }
215 :
216 : inline void
217 39733 : epoll_socket::close_socket() noexcept
218 : {
219 39733 : do_close_socket();
220 39733 : }
221 :
222 244 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
223 244 : : state_(
224 : std::make_unique<epoll_socket_state>(
225 244 : ctx.use_service<epoll_scheduler>()))
226 : {
227 244 : }
228 :
229 488 : inline epoll_socket_service::~epoll_socket_service() {}
230 :
231 : inline void
232 244 : epoll_socket_service::shutdown()
233 : {
234 244 : std::lock_guard lock(state_->mutex_);
235 :
236 244 : while (auto* impl = state_->impl_list_.pop_front())
237 MIS 0 : impl->close_socket();
238 :
239 : // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
240 : // drains completed_ops_, calling destroy() on each queued op. If we
241 : // released our shared_ptrs now, an epoll_op::destroy() could free the
242 : // last ref to an impl whose embedded descriptor_state is still linked
243 : // in the queue — use-after-free on the next pop(). Letting ~state_
244 : // release the ptrs (during service destruction, after scheduler
245 : // shutdown) keeps every impl alive until all ops have been drained.
246 HIT 244 : }
247 :
248 : inline io_object::implementation*
249 13254 : epoll_socket_service::construct()
250 : {
251 13254 : auto impl = std::make_shared<epoll_socket>(*this);
252 13254 : auto* raw = impl.get();
253 :
254 : {
255 13254 : std::lock_guard lock(state_->mutex_);
256 13254 : state_->impl_ptrs_.emplace(raw, std::move(impl));
257 13254 : state_->impl_list_.push_back(raw);
258 13254 : }
259 :
260 13254 : return raw;
261 13254 : }
262 :
263 : inline void
264 13254 : epoll_socket_service::destroy(io_object::implementation* impl)
265 : {
266 13254 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
267 13254 : epoll_impl->close_socket();
268 13254 : std::lock_guard lock(state_->mutex_);
269 13254 : state_->impl_list_.remove(epoll_impl);
270 13254 : state_->impl_ptrs_.erase(epoll_impl);
271 13254 : }
272 :
273 : inline std::error_code
274 4414 : epoll_socket_service::open_socket(
275 : tcp_socket::implementation& impl, int family, int type, int protocol)
276 : {
277 4414 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
278 4414 : epoll_impl->close_socket();
279 :
280 4414 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
281 4414 : if (fd < 0)
282 MIS 0 : return make_err(errno);
283 :
284 HIT 4414 : if (family == AF_INET6)
285 : {
286 5 : int one = 1;
287 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
288 : }
289 :
290 4414 : epoll_impl->fd_ = fd;
291 :
292 : // Register fd with epoll (edge-triggered mode)
293 4414 : epoll_impl->desc_state_.fd = fd;
294 : {
295 4414 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
296 4414 : epoll_impl->desc_state_.read_op = nullptr;
297 4414 : epoll_impl->desc_state_.write_op = nullptr;
298 4414 : epoll_impl->desc_state_.connect_op = nullptr;
299 4414 : }
300 4414 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
301 :
302 4414 : return {};
303 : }
304 :
305 : inline void
306 22065 : epoll_socket_service::close(io_object::handle& h)
307 : {
308 22065 : static_cast<epoll_socket*>(h.get())->close_socket();
309 22065 : }
310 :
311 : inline void
312 51013 : epoll_socket_service::post(scheduler_op* op)
313 : {
314 51013 : state_->sched_.post(op);
315 51013 : }
316 :
317 : inline void
318 4601 : epoll_socket_service::work_started() noexcept
319 : {
320 4601 : state_->sched_.work_started();
321 4601 : }
322 :
323 : inline void
324 150 : epoll_socket_service::work_finished() noexcept
325 : {
326 150 : state_->sched_.work_finished();
327 150 : }
328 :
329 : } // namespace boost::corosio::detail
330 :
331 : #endif // BOOST_COROSIO_HAS_EPOLL
332 :
333 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|