TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <atomic>
31 : #include <chrono>
32 : #include <cstdint>
33 : #include <mutex>
34 :
35 : #include <errno.h>
36 : #include <sys/epoll.h>
37 : #include <sys/eventfd.h>
38 : #include <sys/timerfd.h>
39 : #include <unistd.h>
40 :
41 : namespace boost::corosio::detail {
42 :
43 : struct epoll_op;
44 : struct descriptor_state;
45 :
46 : /** Linux scheduler using epoll for I/O multiplexing.
47 :
48 : This scheduler implements the scheduler interface using Linux epoll
49 : for efficient I/O event notification. It uses a single reactor model
50 : where one thread runs epoll_wait while other threads
51 : wait on a condition variable for handler work. This design provides:
52 :
53 : - Handler parallelism: N posted handlers can execute on N threads
54 : - No thundering herd: condition_variable wakes exactly one thread
55 : - IOCP parity: Behavior matches Windows I/O completion port semantics
56 :
57 : When threads call run(), they first try to execute queued handlers.
58 : If the queue is empty and no reactor is running, one thread becomes
59 : the reactor and runs epoll_wait. Other threads wait on a condition
60 : variable until handlers are available.
61 :
62 : @par Thread Safety
63 : All public member functions are thread-safe.
64 : */
65 : class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66 : {
67 : public:
68 : /** Construct the scheduler.
69 :
70 : Creates an epoll instance, eventfd for reactor interruption,
71 : and timerfd for kernel-managed timer expiry.
72 :
73 : @param ctx Reference to the owning execution_context.
74 : @param concurrency_hint Hint for expected thread count (unused).
75 : */
76 : epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77 :
78 : /// Destroy the scheduler.
79 : ~epoll_scheduler() override;
80 :
81 : epoll_scheduler(epoll_scheduler const&) = delete;
82 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83 :
84 : /// Shut down the scheduler, draining pending operations.
85 : void shutdown() override;
86 :
87 : /** Return the epoll file descriptor.
88 :
89 : Used by socket services to register file descriptors
90 : for I/O event notification.
91 :
92 : @return The epoll file descriptor.
93 : */
94 : int epoll_fd() const noexcept
95 : {
96 : return epoll_fd_;
97 : }
98 :
99 : /** Register a descriptor for persistent monitoring.
100 :
101 : The fd is registered once and stays registered until explicitly
102 : deregistered. Events are dispatched via descriptor_state which
103 : tracks pending read/write/connect operations.
104 :
105 : @param fd The file descriptor to register.
106 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107 : */
108 : void register_descriptor(int fd, descriptor_state* desc) const;
109 :
110 : /** Deregister a persistently registered descriptor.
111 :
112 : @param fd The file descriptor to deregister.
113 : */
114 : void deregister_descriptor(int fd) const;
115 :
116 : private:
117 : void
118 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119 : void interrupt_reactor() const override;
120 : void update_timerfd() const;
121 :
122 : int epoll_fd_;
123 : int event_fd_;
124 : int timer_fd_;
125 :
126 : // Edge-triggered eventfd state
127 : mutable std::atomic<bool> eventfd_armed_{false};
128 :
129 : // Set when the earliest timer changes; flushed before epoll_wait
130 : mutable std::atomic<bool> timerfd_stale_{false};
131 : };
132 :
133 HIT 244 : inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134 244 : : epoll_fd_(-1)
135 244 : , event_fd_(-1)
136 244 : , timer_fd_(-1)
137 : {
138 244 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139 244 : if (epoll_fd_ < 0)
140 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
141 :
142 HIT 244 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143 244 : if (event_fd_ < 0)
144 : {
145 MIS 0 : int errn = errno;
146 0 : ::close(epoll_fd_);
147 0 : detail::throw_system_error(make_err(errn), "eventfd");
148 : }
149 :
150 HIT 244 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151 244 : if (timer_fd_ < 0)
152 : {
153 MIS 0 : int errn = errno;
154 0 : ::close(event_fd_);
155 0 : ::close(epoll_fd_);
156 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
157 : }
158 :
159 HIT 244 : epoll_event ev{};
160 244 : ev.events = EPOLLIN | EPOLLET;
161 244 : ev.data.ptr = nullptr;
162 244 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163 : {
164 MIS 0 : int errn = errno;
165 0 : ::close(timer_fd_);
166 0 : ::close(event_fd_);
167 0 : ::close(epoll_fd_);
168 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
169 : }
170 :
171 HIT 244 : epoll_event timer_ev{};
172 244 : timer_ev.events = EPOLLIN | EPOLLERR;
173 244 : timer_ev.data.ptr = &timer_fd_;
174 244 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175 : {
176 MIS 0 : int errn = errno;
177 0 : ::close(timer_fd_);
178 0 : ::close(event_fd_);
179 0 : ::close(epoll_fd_);
180 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181 : }
182 :
183 HIT 244 : timer_svc_ = &get_timer_service(ctx, *this);
184 244 : timer_svc_->set_on_earliest_changed(
185 4865 : timer_service::callback(this, [](void* p) {
186 4621 : auto* self = static_cast<epoll_scheduler*>(p);
187 4621 : self->timerfd_stale_.store(true, std::memory_order_release);
188 4621 : self->interrupt_reactor();
189 4621 : }));
190 :
191 244 : get_resolver_service(ctx, *this);
192 244 : get_signal_service(ctx, *this);
193 :
194 244 : completed_ops_.push(&task_op_);
195 244 : }
196 :
197 488 : inline epoll_scheduler::~epoll_scheduler()
198 : {
199 244 : if (timer_fd_ >= 0)
200 244 : ::close(timer_fd_);
201 244 : if (event_fd_ >= 0)
202 244 : ::close(event_fd_);
203 244 : if (epoll_fd_ >= 0)
204 244 : ::close(epoll_fd_);
205 488 : }
206 :
207 : inline void
208 244 : epoll_scheduler::shutdown()
209 : {
210 244 : shutdown_drain();
211 :
212 244 : if (event_fd_ >= 0)
213 244 : interrupt_reactor();
214 244 : }
215 :
216 : inline void
217 8886 : epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218 : {
219 8886 : epoll_event ev{};
220 8886 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221 8886 : ev.data.ptr = desc;
222 :
223 8886 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225 :
226 HIT 8886 : desc->registered_events = ev.events;
227 8886 : desc->fd = fd;
228 8886 : desc->scheduler_ = this;
229 8886 : desc->ready_events_.store(0, std::memory_order_relaxed);
230 :
231 8886 : std::lock_guard lock(desc->mutex);
232 8886 : desc->impl_ref_.reset();
233 8886 : desc->read_ready = false;
234 8886 : desc->write_ready = false;
235 8886 : }
236 :
237 : inline void
238 8886 : epoll_scheduler::deregister_descriptor(int fd) const
239 : {
240 8886 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241 8886 : }
242 :
243 : inline void
244 5082 : epoll_scheduler::interrupt_reactor() const
245 : {
246 5082 : bool expected = false;
247 5082 : if (eventfd_armed_.compare_exchange_strong(
248 : expected, true, std::memory_order_release,
249 : std::memory_order_relaxed))
250 : {
251 4915 : std::uint64_t val = 1;
252 4915 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253 : }
254 5082 : }
255 :
256 : inline void
257 9203 : epoll_scheduler::update_timerfd() const
258 : {
259 9203 : auto nearest = timer_svc_->nearest_expiry();
260 :
261 9203 : itimerspec ts{};
262 9203 : int flags = 0;
263 :
264 9203 : if (nearest == timer_service::time_point::max())
265 : {
266 : // No timers — disarm by setting to 0 (relative)
267 : }
268 : else
269 : {
270 9158 : auto now = std::chrono::steady_clock::now();
271 9158 : if (nearest <= now)
272 : {
273 : // Use 1ns instead of 0 — zero disarms the timerfd
274 204 : ts.it_value.tv_nsec = 1;
275 : }
276 : else
277 : {
278 8954 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279 8954 : nearest - now)
280 8954 : .count();
281 8954 : ts.it_value.tv_sec = nsec / 1000000000;
282 8954 : ts.it_value.tv_nsec = nsec % 1000000000;
283 8954 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284 MIS 0 : ts.it_value.tv_nsec = 1;
285 : }
286 : }
287 :
288 HIT 9203 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289 MIS 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
290 HIT 9203 : }
291 :
292 : inline void
293 40013 : epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294 : {
295 40013 : int timeout_ms = task_interrupted_ ? 0 : -1;
296 :
297 40013 : if (lock.owns_lock())
298 13170 : lock.unlock();
299 :
300 40013 : task_cleanup on_exit{this, &lock, ctx};
301 :
302 : // Flush deferred timerfd programming before blocking
303 40013 : if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304 4600 : update_timerfd();
305 :
306 : epoll_event events[128];
307 40013 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308 :
309 40013 : if (nfds < 0 && errno != EINTR)
310 MIS 0 : detail::throw_system_error(make_err(errno), "epoll_wait");
311 :
312 HIT 40013 : bool check_timers = false;
313 40013 : op_queue local_ops;
314 :
315 92518 : for (int i = 0; i < nfds; ++i)
316 : {
317 52505 : if (events[i].data.ptr == nullptr)
318 : {
319 : std::uint64_t val;
320 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321 4671 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322 4671 : eventfd_armed_.store(false, std::memory_order_relaxed);
323 4671 : continue;
324 4671 : }
325 :
326 47834 : if (events[i].data.ptr == &timer_fd_)
327 : {
328 : std::uint64_t expirations;
329 : // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330 : [[maybe_unused]] auto r =
331 4603 : ::read(timer_fd_, &expirations, sizeof(expirations));
332 4603 : check_timers = true;
333 4603 : continue;
334 4603 : }
335 :
336 43231 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337 43231 : desc->add_ready_events(events[i].events);
338 :
339 43231 : bool expected = false;
340 43231 : if (desc->is_enqueued_.compare_exchange_strong(
341 : expected, true, std::memory_order_release,
342 : std::memory_order_relaxed))
343 : {
344 43231 : local_ops.push(desc);
345 : }
346 : }
347 :
348 40013 : if (check_timers)
349 : {
350 4603 : timer_svc_->process_expired();
351 4603 : update_timerfd();
352 : }
353 :
354 40013 : lock.lock();
355 :
356 40013 : if (!local_ops.empty())
357 26345 : completed_ops_.splice(local_ops);
358 40013 : }
359 :
360 : } // namespace boost::corosio::detail
361 :
362 : #endif // BOOST_COROSIO_HAS_EPOLL
363 :
364 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
|