TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 :
20 : #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp>
21 :
22 : #include <boost/corosio/native/detail/select/select_op.hpp>
23 : #include <boost/corosio/detail/timer_service.hpp>
24 : #include <boost/corosio/native/detail/make_err.hpp>
25 : #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp>
26 : #include <boost/corosio/native/detail/posix/posix_signal_service.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <sys/select.h>
31 : #include <unistd.h>
32 : #include <errno.h>
33 : #include <fcntl.h>
34 :
35 : #include <atomic>
36 : #include <chrono>
37 : #include <cstdint>
38 : #include <limits>
39 : #include <mutex>
40 : #include <unordered_map>
41 :
42 : namespace boost::corosio::detail {
43 :
44 : struct select_op;
45 : struct select_descriptor_state;
46 :
47 : /** POSIX scheduler using select() for I/O multiplexing.
48 :
49 : This scheduler implements the scheduler interface using the POSIX select()
50 : call for I/O event notification. It inherits the shared reactor threading
51 : model from reactor_scheduler_base: signal state machine, inline completion
52 : budget, work counting, and the do_one event loop.
53 :
54 : The design mirrors epoll_scheduler for behavioral consistency:
55 : - Same single-reactor thread coordination model
56 : - Same deferred I/O pattern (reactor marks ready; workers do I/O)
57 : - Same timer integration pattern
58 :
59 : Known Limitations:
60 : - FD_SETSIZE (~1024) limits maximum concurrent connections
61 : - O(n) scanning: rebuilds fd_sets each iteration
62 : - Level-triggered only (no edge-triggered mode)
63 :
64 : @par Thread Safety
65 : All public member functions are thread-safe.
66 : */
67 : class BOOST_COROSIO_DECL select_scheduler final : public reactor_scheduler_base
68 : {
69 : public:
70 : /** Construct the scheduler.
71 :
72 : Creates a self-pipe for reactor interruption.
73 :
74 : @param ctx Reference to the owning execution_context.
75 : @param concurrency_hint Hint for expected thread count (unused).
76 : */
77 : select_scheduler(capy::execution_context& ctx, int concurrency_hint = -1);
78 :
79 : /// Destroy the scheduler.
80 : ~select_scheduler() override;
81 :
82 : select_scheduler(select_scheduler const&) = delete;
83 : select_scheduler& operator=(select_scheduler const&) = delete;
84 :
85 : /// Shut down the scheduler, draining pending operations.
86 : void shutdown() override;
87 :
88 : /** Return the maximum file descriptor value supported.
89 :
90 : Returns FD_SETSIZE - 1, the maximum fd value that can be
91 : monitored by select(). Operations with fd >= FD_SETSIZE
92 : will fail with EINVAL.
93 :
94 : @return The maximum supported file descriptor value.
95 : */
96 : static constexpr int max_fd() noexcept
97 : {
98 : return FD_SETSIZE - 1;
99 : }
100 :
101 : /** Register a descriptor for persistent monitoring.
102 :
103 : The fd is added to the registered_descs_ map and will be
104 : included in subsequent select() calls. The reactor is
105 : interrupted so a blocked select() rebuilds its fd_sets.
106 :
107 : @param fd The file descriptor to register.
108 : @param desc Pointer to descriptor state for this fd.
109 : */
110 : void register_descriptor(int fd, select_descriptor_state* desc) const;
111 :
112 : /** Deregister a persistently registered descriptor.
113 :
114 : @param fd The file descriptor to deregister.
115 : */
116 : void deregister_descriptor(int fd) const;
117 :
118 : /** Interrupt the reactor so it rebuilds its fd_sets.
119 :
120 : Called when a write or connect op is registered after
121 : the reactor's snapshot was taken. Without this, select()
122 : may block not watching for writability on the fd.
123 : */
124 : void notify_reactor() const;
125 :
126 : private:
127 : void
128 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) override;
129 : void interrupt_reactor() const override;
130 : long calculate_timeout(long requested_timeout_us) const;
131 :
132 : // Self-pipe for interrupting select()
133 : int pipe_fds_[2]; // [0]=read, [1]=write
134 :
135 : // Per-fd tracking for fd_set building
136 : mutable std::unordered_map<int, select_descriptor_state*> registered_descs_;
137 : mutable int max_fd_ = -1;
138 : };
139 :
140 HIT 168 : inline select_scheduler::select_scheduler(capy::execution_context& ctx, int)
141 168 : : pipe_fds_{-1, -1}
142 168 : , max_fd_(-1)
143 : {
144 168 : if (::pipe(pipe_fds_) < 0)
145 MIS 0 : detail::throw_system_error(make_err(errno), "pipe");
146 :
147 HIT 504 : for (int i = 0; i < 2; ++i)
148 : {
149 336 : int flags = ::fcntl(pipe_fds_[i], F_GETFL, 0);
150 336 : if (flags == -1)
151 : {
152 MIS 0 : int errn = errno;
153 0 : ::close(pipe_fds_[0]);
154 0 : ::close(pipe_fds_[1]);
155 0 : detail::throw_system_error(make_err(errn), "fcntl F_GETFL");
156 : }
157 HIT 336 : if (::fcntl(pipe_fds_[i], F_SETFL, flags | O_NONBLOCK) == -1)
158 : {
159 MIS 0 : int errn = errno;
160 0 : ::close(pipe_fds_[0]);
161 0 : ::close(pipe_fds_[1]);
162 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFL");
163 : }
164 HIT 336 : if (::fcntl(pipe_fds_[i], F_SETFD, FD_CLOEXEC) == -1)
165 : {
166 MIS 0 : int errn = errno;
167 0 : ::close(pipe_fds_[0]);
168 0 : ::close(pipe_fds_[1]);
169 0 : detail::throw_system_error(make_err(errn), "fcntl F_SETFD");
170 : }
171 : }
172 :
173 HIT 168 : timer_svc_ = &get_timer_service(ctx, *this);
174 168 : timer_svc_->set_on_earliest_changed(
175 3475 : timer_service::callback(this, [](void* p) {
176 3307 : static_cast<select_scheduler*>(p)->interrupt_reactor();
177 3307 : }));
178 :
179 168 : get_resolver_service(ctx, *this);
180 168 : get_signal_service(ctx, *this);
181 :
182 168 : completed_ops_.push(&task_op_);
183 168 : }
184 :
185 336 : inline select_scheduler::~select_scheduler()
186 : {
187 168 : if (pipe_fds_[0] >= 0)
188 168 : ::close(pipe_fds_[0]);
189 168 : if (pipe_fds_[1] >= 0)
190 168 : ::close(pipe_fds_[1]);
191 336 : }
192 :
193 : inline void
194 168 : select_scheduler::shutdown()
195 : {
196 168 : shutdown_drain();
197 :
198 168 : if (pipe_fds_[1] >= 0)
199 168 : interrupt_reactor();
200 168 : }
201 :
202 : inline void
203 6242 : select_scheduler::register_descriptor(
204 : int fd, select_descriptor_state* desc) const
205 : {
206 6242 : if (fd < 0 || fd >= FD_SETSIZE)
207 MIS 0 : detail::throw_system_error(make_err(EINVAL), "select: fd out of range");
208 :
209 HIT 6242 : desc->registered_events = reactor_event_read | reactor_event_write;
210 6242 : desc->fd = fd;
211 6242 : desc->scheduler_ = this;
212 6242 : desc->ready_events_.store(0, std::memory_order_relaxed);
213 :
214 : {
215 6242 : std::lock_guard lock(desc->mutex);
216 6242 : desc->impl_ref_.reset();
217 6242 : desc->read_ready = false;
218 6242 : desc->write_ready = false;
219 6242 : }
220 :
221 : {
222 6242 : std::lock_guard lock(mutex_);
223 6242 : registered_descs_[fd] = desc;
224 6242 : if (fd > max_fd_)
225 6238 : max_fd_ = fd;
226 6242 : }
227 :
228 6242 : interrupt_reactor();
229 6242 : }
230 :
231 : inline void
232 6242 : select_scheduler::deregister_descriptor(int fd) const
233 : {
234 6242 : std::lock_guard lock(mutex_);
235 :
236 6242 : auto it = registered_descs_.find(fd);
237 6242 : if (it == registered_descs_.end())
238 MIS 0 : return;
239 :
240 HIT 6242 : registered_descs_.erase(it);
241 :
242 6242 : if (fd == max_fd_)
243 : {
244 6192 : max_fd_ = pipe_fds_[0];
245 12324 : for (auto& [registered_fd, state] : registered_descs_)
246 : {
247 6132 : if (registered_fd > max_fd_)
248 6123 : max_fd_ = registered_fd;
249 : }
250 : }
251 6242 : }
252 :
253 : inline void
254 28304 : select_scheduler::notify_reactor() const
255 : {
256 28304 : interrupt_reactor();
257 28304 : }
258 :
259 : inline void
260 38147 : select_scheduler::interrupt_reactor() const
261 : {
262 38147 : char byte = 1;
263 38147 : [[maybe_unused]] auto r = ::write(pipe_fds_[1], &byte, 1);
264 38147 : }
265 :
266 : inline long
267 136371 : select_scheduler::calculate_timeout(long requested_timeout_us) const
268 : {
269 136371 : if (requested_timeout_us == 0)
270 MIS 0 : return 0;
271 :
272 HIT 136371 : auto nearest = timer_svc_->nearest_expiry();
273 136371 : if (nearest == timer_service::time_point::max())
274 46 : return requested_timeout_us;
275 :
276 136325 : auto now = std::chrono::steady_clock::now();
277 136325 : if (nearest <= now)
278 334 : return 0;
279 :
280 : auto timer_timeout_us =
281 135991 : std::chrono::duration_cast<std::chrono::microseconds>(nearest - now)
282 135991 : .count();
283 :
284 135991 : constexpr auto long_max =
285 : static_cast<long long>((std::numeric_limits<long>::max)());
286 : auto capped_timer_us =
287 135991 : (std::min)((std::max)(static_cast<long long>(timer_timeout_us),
288 135991 : static_cast<long long>(0)),
289 135991 : long_max);
290 :
291 135991 : if (requested_timeout_us < 0)
292 135991 : return static_cast<long>(capped_timer_us);
293 :
294 : return static_cast<long>(
295 MIS 0 : (std::min)(static_cast<long long>(requested_timeout_us),
296 0 : capped_timer_us));
297 : }
298 :
299 : inline void
300 HIT 161957 : select_scheduler::run_task(
301 : std::unique_lock<std::mutex>& lock, context_type* ctx)
302 : {
303 161957 : long effective_timeout_us = task_interrupted_ ? 0 : calculate_timeout(-1);
304 :
305 : // Snapshot registered descriptors while holding lock.
306 : // Record which fds need write monitoring to avoid a hot loop:
307 : // select is level-triggered so writable sockets (nearly always
308 : // writable) would cause select() to return immediately every
309 : // iteration if unconditionally added to write_fds.
310 : struct fd_entry
311 : {
312 : int fd;
313 : select_descriptor_state* desc;
314 : bool needs_write;
315 : };
316 : fd_entry snapshot[FD_SETSIZE];
317 161957 : int snapshot_count = 0;
318 :
319 494968 : for (auto& [fd, desc] : registered_descs_)
320 : {
321 333011 : if (snapshot_count < FD_SETSIZE)
322 : {
323 333011 : std::lock_guard desc_lock(desc->mutex);
324 333011 : snapshot[snapshot_count].fd = fd;
325 333011 : snapshot[snapshot_count].desc = desc;
326 333011 : snapshot[snapshot_count].needs_write =
327 333011 : (desc->write_op || desc->connect_op);
328 333011 : ++snapshot_count;
329 333011 : }
330 : }
331 :
332 161957 : if (lock.owns_lock())
333 136371 : lock.unlock();
334 :
335 161957 : task_cleanup on_exit{this, &lock, ctx};
336 :
337 : fd_set read_fds, write_fds, except_fds;
338 2753269 : FD_ZERO(&read_fds);
339 2753269 : FD_ZERO(&write_fds);
340 2753269 : FD_ZERO(&except_fds);
341 :
342 161957 : FD_SET(pipe_fds_[0], &read_fds);
343 161957 : int nfds = pipe_fds_[0];
344 :
345 494968 : for (int i = 0; i < snapshot_count; ++i)
346 : {
347 333011 : int fd = snapshot[i].fd;
348 333011 : FD_SET(fd, &read_fds);
349 333011 : if (snapshot[i].needs_write)
350 3086 : FD_SET(fd, &write_fds);
351 333011 : FD_SET(fd, &except_fds);
352 333011 : if (fd > nfds)
353 161697 : nfds = fd;
354 : }
355 :
356 : struct timeval tv;
357 161957 : struct timeval* tv_ptr = nullptr;
358 161957 : if (effective_timeout_us >= 0)
359 : {
360 161911 : tv.tv_sec = effective_timeout_us / 1000000;
361 161911 : tv.tv_usec = effective_timeout_us % 1000000;
362 161911 : tv_ptr = &tv;
363 : }
364 :
365 161957 : int ready = ::select(nfds + 1, &read_fds, &write_fds, &except_fds, tv_ptr);
366 :
367 : // EINTR: signal interrupted select(), just retry.
368 : // EBADF: an fd was closed between snapshot and select(); retry
369 : // with a fresh snapshot from registered_descs_.
370 161957 : if (ready < 0)
371 : {
372 MIS 0 : if (errno == EINTR || errno == EBADF)
373 0 : return;
374 0 : detail::throw_system_error(make_err(errno), "select");
375 : }
376 :
377 : // Process timers outside the lock
378 HIT 161957 : timer_svc_->process_expired();
379 :
380 161957 : op_queue local_ops;
381 :
382 161957 : if (ready > 0)
383 : {
384 146762 : if (FD_ISSET(pipe_fds_[0], &read_fds))
385 : {
386 : char buf[256];
387 38852 : while (::read(pipe_fds_[0], buf, sizeof(buf)) > 0)
388 : {
389 : }
390 : }
391 :
392 455607 : for (int i = 0; i < snapshot_count; ++i)
393 : {
394 308845 : int fd = snapshot[i].fd;
395 308845 : select_descriptor_state* desc = snapshot[i].desc;
396 :
397 308845 : std::uint32_t flags = 0;
398 308845 : if (FD_ISSET(fd, &read_fds))
399 139497 : flags |= reactor_event_read;
400 308845 : if (FD_ISSET(fd, &write_fds))
401 3086 : flags |= reactor_event_write;
402 308845 : if (FD_ISSET(fd, &except_fds))
403 MIS 0 : flags |= reactor_event_error;
404 :
405 HIT 308845 : if (flags == 0)
406 166264 : continue;
407 :
408 142581 : desc->add_ready_events(flags);
409 :
410 142581 : bool expected = false;
411 142581 : if (desc->is_enqueued_.compare_exchange_strong(
412 : expected, true, std::memory_order_release,
413 : std::memory_order_relaxed))
414 : {
415 142581 : local_ops.push(desc);
416 : }
417 : }
418 : }
419 :
420 161957 : lock.lock();
421 :
422 161957 : if (!local_ops.empty())
423 139497 : completed_ops_.splice(local_ops);
424 161957 : }
425 :
426 : } // namespace boost::corosio::detail
427 :
428 : #endif // BOOST_COROSIO_HAS_SELECT
429 :
430 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SCHEDULER_HPP
|