include/boost/corosio/native/detail/epoll/epoll_scheduler.hpp

83.9% Lines (115/137) 100.0% List of functions (9/9)
f(x) Functions (9)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19
20 #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21
22 #include <boost/corosio/native/detail/epoll/epoll_op.hpp>
23 #include <boost/corosio/detail/timer_service.hpp>
24 #include <boost/corosio/native/detail/make_err.hpp>
25 #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <atomic>
31 #include <chrono>
32 #include <cstdint>
33 #include <mutex>
34
35 #include <errno.h>
36 #include <sys/epoll.h>
37 #include <sys/eventfd.h>
38 #include <sys/timerfd.h>
39 #include <unistd.h>
40
41 namespace boost::corosio::detail {
42
43 struct epoll_op;
44 struct descriptor_state;
45
46 /** Linux scheduler using epoll for I/O multiplexing.
47
48 This scheduler implements the scheduler interface using Linux epoll
49 for efficient I/O event notification. It uses a single reactor model
50 where one thread runs epoll_wait while other threads
51 wait on a condition variable for handler work. This design provides:
52
53 - Handler parallelism: N posted handlers can execute on N threads
54 - No thundering herd: condition_variable wakes exactly one thread
55 - IOCP parity: Behavior matches Windows I/O completion port semantics
56
57 When threads call run(), they first try to execute queued handlers.
58 If the queue is empty and no reactor is running, one thread becomes
59 the reactor and runs epoll_wait. Other threads wait on a condition
60 variable until handlers are available.
61
62 @par Thread Safety
63 All public member functions are thread-safe.
64 */
65 class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler_base
66 {
67 public:
68 /** Construct the scheduler.
69
70 Creates an epoll instance, eventfd for reactor interruption,
71 and timerfd for kernel-managed timer expiry.
72
73 @param ctx Reference to the owning execution_context.
74 @param concurrency_hint Hint for expected thread count (unused).
75 */
76 epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
77
78 /// Destroy the scheduler.
79 ~epoll_scheduler() override;
80
81 epoll_scheduler(epoll_scheduler const&) = delete;
82 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
83
84 /// Shut down the scheduler, draining pending operations.
85 void shutdown() override;
86
87 /** Return the epoll file descriptor.
88
89 Used by socket services to register file descriptors
90 for I/O event notification.
91
92 @return The epoll file descriptor.
93 */
94 int epoll_fd() const noexcept
95 {
96 return epoll_fd_;
97 }
98
99 /** Register a descriptor for persistent monitoring.
100
101 The fd is registered once and stays registered until explicitly
102 deregistered. Events are dispatched via descriptor_state which
103 tracks pending read/write/connect operations.
104
105 @param fd The file descriptor to register.
106 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
107 */
108 void register_descriptor(int fd, descriptor_state* desc) const;
109
110 /** Deregister a persistently registered descriptor.
111
112 @param fd The file descriptor to deregister.
113 */
114 void deregister_descriptor(int fd) const;
115
116 private:
117 void
118 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
119 void interrupt_reactor() const override;
120 void update_timerfd() const;
121
122 int epoll_fd_;
123 int event_fd_;
124 int timer_fd_;
125
126 // Edge-triggered eventfd state
127 mutable std::atomic<bool> eventfd_armed_{false};
128
129 // Set when the earliest timer changes; flushed before epoll_wait
130 mutable std::atomic<bool> timerfd_stale_{false};
131 };
132
133 244x inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int)
134 244x : epoll_fd_(-1)
135 244x , event_fd_(-1)
136 244x , timer_fd_(-1)
137 {
138 244x epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
139 244x if (epoll_fd_ < 0)
140 detail::throw_system_error(make_err(errno), "epoll_create1");
141
142 244x event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
143 244x if (event_fd_ < 0)
144 {
145 int errn = errno;
146 ::close(epoll_fd_);
147 detail::throw_system_error(make_err(errn), "eventfd");
148 }
149
150 244x timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
151 244x if (timer_fd_ < 0)
152 {
153 int errn = errno;
154 ::close(event_fd_);
155 ::close(epoll_fd_);
156 detail::throw_system_error(make_err(errn), "timerfd_create");
157 }
158
159 244x epoll_event ev{};
160 244x ev.events = EPOLLIN | EPOLLET;
161 244x ev.data.ptr = nullptr;
162 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
163 {
164 int errn = errno;
165 ::close(timer_fd_);
166 ::close(event_fd_);
167 ::close(epoll_fd_);
168 detail::throw_system_error(make_err(errn), "epoll_ctl");
169 }
170
171 244x epoll_event timer_ev{};
172 244x timer_ev.events = EPOLLIN | EPOLLERR;
173 244x timer_ev.data.ptr = &timer_fd_;
174 244x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
175 {
176 int errn = errno;
177 ::close(timer_fd_);
178 ::close(event_fd_);
179 ::close(epoll_fd_);
180 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
181 }
182
183 244x timer_svc_ = &get_timer_service(ctx, *this);
184 244x timer_svc_->set_on_earliest_changed(
185 4865x timer_service::callback(this, [](void* p) {
186 4621x auto* self = static_cast<epoll_scheduler*>(p);
187 4621x self->timerfd_stale_.store(true, std::memory_order_release);
188 4621x self->interrupt_reactor();
189 4621x }));
190
191 244x get_resolver_service(ctx, *this);
192 244x get_signal_service(ctx, *this);
193
194 244x completed_ops_.push(&task_op_);
195 244x }
196
197 488x inline epoll_scheduler::~epoll_scheduler()
198 {
199 244x if (timer_fd_ >= 0)
200 244x ::close(timer_fd_);
201 244x if (event_fd_ >= 0)
202 244x ::close(event_fd_);
203 244x if (epoll_fd_ >= 0)
204 244x ::close(epoll_fd_);
205 488x }
206
207 inline void
208 244x epoll_scheduler::shutdown()
209 {
210 244x shutdown_drain();
211
212 244x if (event_fd_ >= 0)
213 244x interrupt_reactor();
214 244x }
215
216 inline void
217 8886x epoll_scheduler::register_descriptor(int fd, descriptor_state* desc) const
218 {
219 8886x epoll_event ev{};
220 8886x ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
221 8886x ev.data.ptr = desc;
222
223 8886x if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
224 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
225
226 8886x desc->registered_events = ev.events;
227 8886x desc->fd = fd;
228 8886x desc->scheduler_ = this;
229 8886x desc->ready_events_.store(0, std::memory_order_relaxed);
230
231 8886x std::lock_guard lock(desc->mutex);
232 8886x desc->impl_ref_.reset();
233 8886x desc->read_ready = false;
234 8886x desc->write_ready = false;
235 8886x }
236
237 inline void
238 8886x epoll_scheduler::deregister_descriptor(int fd) const
239 {
240 8886x ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
241 8886x }
242
243 inline void
244 5082x epoll_scheduler::interrupt_reactor() const
245 {
246 5082x bool expected = false;
247 5082x if (eventfd_armed_.compare_exchange_strong(
248 expected, true, std::memory_order_release,
249 std::memory_order_relaxed))
250 {
251 4915x std::uint64_t val = 1;
252 4915x [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
253 }
254 5082x }
255
256 inline void
257 9203x epoll_scheduler::update_timerfd() const
258 {
259 9203x auto nearest = timer_svc_->nearest_expiry();
260
261 9203x itimerspec ts{};
262 9203x int flags = 0;
263
264 9203x if (nearest == timer_service::time_point::max())
265 {
266 // No timers — disarm by setting to 0 (relative)
267 }
268 else
269 {
270 9158x auto now = std::chrono::steady_clock::now();
271 9158x if (nearest <= now)
272 {
273 // Use 1ns instead of 0 — zero disarms the timerfd
274 204x ts.it_value.tv_nsec = 1;
275 }
276 else
277 {
278 8954x auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
279 8954x nearest - now)
280 8954x .count();
281 8954x ts.it_value.tv_sec = nsec / 1000000000;
282 8954x ts.it_value.tv_nsec = nsec % 1000000000;
283 8954x if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
284 ts.it_value.tv_nsec = 1;
285 }
286 }
287
288 9203x if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
289 detail::throw_system_error(make_err(errno), "timerfd_settime");
290 9203x }
291
292 inline void
293 40013x epoll_scheduler::run_task(std::unique_lock<std::mutex>& lock, context_type* ctx)
294 {
295 40013x int timeout_ms = task_interrupted_ ? 0 : -1;
296
297 40013x if (lock.owns_lock())
298 13170x lock.unlock();
299
300 40013x task_cleanup on_exit{this, &lock, ctx};
301
302 // Flush deferred timerfd programming before blocking
303 40013x if (timerfd_stale_.exchange(false, std::memory_order_acquire))
304 4600x update_timerfd();
305
306 epoll_event events[128];
307 40013x int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
308
309 40013x if (nfds < 0 && errno != EINTR)
310 detail::throw_system_error(make_err(errno), "epoll_wait");
311
312 40013x bool check_timers = false;
313 40013x op_queue local_ops;
314
315 92518x for (int i = 0; i < nfds; ++i)
316 {
317 52505x if (events[i].data.ptr == nullptr)
318 {
319 std::uint64_t val;
320 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
321 4671x [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
322 4671x eventfd_armed_.store(false, std::memory_order_relaxed);
323 4671x continue;
324 4671x }
325
326 47834x if (events[i].data.ptr == &timer_fd_)
327 {
328 std::uint64_t expirations;
329 // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection)
330 [[maybe_unused]] auto r =
331 4603x ::read(timer_fd_, &expirations, sizeof(expirations));
332 4603x check_timers = true;
333 4603x continue;
334 4603x }
335
336 43231x auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
337 43231x desc->add_ready_events(events[i].events);
338
339 43231x bool expected = false;
340 43231x if (desc->is_enqueued_.compare_exchange_strong(
341 expected, true, std::memory_order_release,
342 std::memory_order_relaxed))
343 {
344 43231x local_ops.push(desc);
345 }
346 }
347
348 40013x if (check_timers)
349 {
350 4603x timer_svc_->process_expired();
351 4603x update_timerfd();
352 }
353
354 40013x lock.lock();
355
356 40013x if (!local_ops.empty())
357 26345x completed_ops_.splice(local_ops);
358 40013x }
359
360 } // namespace boost::corosio::detail
361
362 #endif // BOOST_COROSIO_HAS_EPOLL
363
364 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP
365