include/boost/corosio/native/detail/reactor/reactor_scheduler.hpp

75.9% Lines (233/307) 81.0% List of functions (34/42)
f(x) Functions (42)
Function Calls Lines Branches Blocks
boost::corosio::detail::reactor_scheduler_context::reactor_scheduler_context(boost::corosio::detail::reactor_scheduler_base const*, boost::corosio::detail::reactor_scheduler_context*) :65 0 100.0% boost::corosio::detail::reactor_find_context(boost::corosio::detail::reactor_scheduler_base const*) :82 0 100.0% boost::corosio::detail::reactor_flush_private_work(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&) :94 0 0.0% boost::corosio::detail::reactor_drain_private_queue(boost::corosio::detail::reactor_scheduler_context*, std::atomic<long>&, boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) :111 0 0.0% boost::corosio::detail::reactor_scheduler_base::reactor_scheduler_base() :237 0 100.0% boost::corosio::detail::reactor_scheduler_base::task_op::operator()() :275 0 0.0% boost::corosio::detail::reactor_scheduler_base::task_op::destroy() :276 0 0.0% boost::corosio::detail::reactor_thread_context_guard::reactor_thread_context_guard(boost::corosio::detail::reactor_scheduler_base const*) :321 0 100.0% boost::corosio::detail::reactor_thread_context_guard::~reactor_thread_context_guard() :329 0 66.7% boost::corosio::detail::reactor_scheduler_base::reset_inline_budget() const :341 0 54.5% boost::corosio::detail::reactor_scheduler_base::try_consume_inline_budget() const :362 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const :376 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::post_handler(std::__n4861::coroutine_handle<void>) :382 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::~post_handler() :383 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::operator()() :385 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(std::__n4861::coroutine_handle<void>) const::post_handler::destroy() :394 0 100.0% boost::corosio::detail::reactor_scheduler_base::post(boost::corosio::detail::scheduler_op*) const :419 0 100.0% boost::corosio::detail::reactor_scheduler_base::running_in_this_thread() const :436 0 100.0% boost::corosio::detail::reactor_scheduler_base::stop() :442 0 100.0% boost::corosio::detail::reactor_scheduler_base::stopped() const :454 0 100.0% boost::corosio::detail::reactor_scheduler_base::restart() :461 0 100.0% boost::corosio::detail::reactor_scheduler_base::run() :468 0 100.0% boost::corosio::detail::reactor_scheduler_base::run_one() :493 0 75.0% boost::corosio::detail::reactor_scheduler_base::wait_one(long) :507 0 100.0% boost::corosio::detail::reactor_scheduler_base::poll() :521 0 100.0% boost::corosio::detail::reactor_scheduler_base::poll_one() :546 0 100.0% boost::corosio::detail::reactor_scheduler_base::work_started() :560 0 100.0% boost::corosio::detail::reactor_scheduler_base::work_finished() :566 0 100.0% boost::corosio::detail::reactor_scheduler_base::compensating_work_started() const :573 0 100.0% boost::corosio::detail::reactor_scheduler_base::drain_thread_queue(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&, long) const :581 0 0.0% boost::corosio::detail::reactor_scheduler_base::post_deferred_completions(boost::corosio::detail::intrusive_queue<boost::corosio::detail::scheduler_op>&) const :594 0 30.0% boost::corosio::detail::reactor_scheduler_base::shutdown_drain() :611 0 100.0% boost::corosio::detail::reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const :628 0 100.0% boost::corosio::detail::reactor_scheduler_base::maybe_unlock_and_signal_one(std::unique_lock<std::mutex>&) const :635 0 57.1% boost::corosio::detail::reactor_scheduler_base::unlock_and_signal_one(std::unique_lock<std::mutex>&) const :649 0 85.7% boost::corosio::detail::reactor_scheduler_base::clear_signal() const :661 0 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal(std::unique_lock<std::mutex>&) const :667 0 0.0% boost::corosio::detail::reactor_scheduler_base::wait_for_signal_for(std::unique_lock<std::mutex>&, long) const :679 0 0.0% boost::corosio::detail::reactor_scheduler_base::wake_one_thread_and_unlock(std::unique_lock<std::mutex>&) const :691 0 87.5% boost::corosio::detail::reactor_scheduler_base::work_cleanup::~work_cleanup() :709 0 92.3% boost::corosio::detail::reactor_scheduler_base::task_cleanup::~task_cleanup() :733 0 83.3% boost::corosio::detail::reactor_scheduler_base::do_one(std::unique_lock<std::mutex>&, long, boost::corosio::detail::reactor_scheduler_context*) :754 0 66.7%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/capy/ex/execution_context.hpp>
15
16 #include <boost/corosio/native/native_scheduler.hpp>
17 #include <boost/corosio/detail/scheduler_op.hpp>
18 #include <boost/corosio/detail/thread_local_ptr.hpp>
19
20 #include <atomic>
21 #include <chrono>
22 #include <condition_variable>
23 #include <coroutine>
24 #include <cstddef>
25 #include <cstdint>
26 #include <limits>
27 #include <memory>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 // Forward declaration
33 class reactor_scheduler_base;
34
35 /** Per-thread state for a reactor scheduler.
36
37 Each thread running a scheduler's event loop has one of these
38 on a thread-local stack. It holds a private work queue and
39 inline completion budget for speculative I/O fast paths.
40 */
41 struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 {
43 /// Scheduler this context belongs to.
44 reactor_scheduler_base const* key;
45
46 /// Next context frame on this thread's stack.
47 reactor_scheduler_context* next;
48
49 /// Private work queue for reduced contention.
50 op_queue private_queue;
51
52 /// Unflushed work count for the private queue.
53 std::int64_t private_outstanding_work;
54
55 /// Remaining inline completions allowed this cycle.
56 int inline_budget;
57
58 /// Maximum inline budget (adaptive, 2-16).
59 int inline_budget_max;
60
61 /// True if no other thread absorbed queued work last cycle.
62 bool unassisted;
63
64 /// Construct a context frame linked to @a n.
65 363x reactor_scheduler_context(
66 reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 363x : key(k)
68 363x , next(n)
69 363x , private_outstanding_work(0)
70 363x , inline_budget(0)
71 363x , inline_budget_max(2)
72 363x , unassisted(false)
73 {
74 363x }
75 };
76
77 /// Thread-local context stack for reactor schedulers.
78 inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79
80 /// Find the context frame for a scheduler on this thread.
81 inline reactor_scheduler_context*
82 914875x reactor_find_context(reactor_scheduler_base const* self) noexcept
83 {
84 914875x for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 {
86 912387x if (c->key == self)
87 912387x return c;
88 }
89 2488x return nullptr;
90 }
91
92 /// Flush private work count to global counter.
93 inline void
94 reactor_flush_private_work(
95 reactor_scheduler_context* ctx,
96 std::atomic<std::int64_t>& outstanding_work) noexcept
97 {
98 if (ctx && ctx->private_outstanding_work > 0)
99 {
100 outstanding_work.fetch_add(
101 ctx->private_outstanding_work, std::memory_order_relaxed);
102 ctx->private_outstanding_work = 0;
103 }
104 }
105
106 /** Drain private queue to global queue, flushing work count first.
107
108 @return True if any ops were drained.
109 */
110 inline bool
111 reactor_drain_private_queue(
112 reactor_scheduler_context* ctx,
113 std::atomic<std::int64_t>& outstanding_work,
114 op_queue& completed_ops) noexcept
115 {
116 if (!ctx || ctx->private_queue.empty())
117 return false;
118
119 reactor_flush_private_work(ctx, outstanding_work);
120 completed_ops.splice(ctx->private_queue);
121 return true;
122 }
123
124 /** Non-template base for reactor-backed scheduler implementations.
125
126 Provides the complete threading model shared by epoll, kqueue,
127 and select schedulers: signal state machine, inline completion
128 budget, work counting, run/poll methods, and the do_one event
129 loop.
130
131 Derived classes provide platform-specific hooks by overriding:
132 - `run_task(lock, ctx)` to run the reactor poll
133 - `interrupt_reactor()` to wake a blocked reactor
134
135 De-templated from the original CRTP design to eliminate
136 duplicate instantiations when multiple backends are compiled
137 into the same binary. Virtual dispatch for run_task (called
138 once per reactor cycle, before a blocking syscall) has
139 negligible overhead.
140
141 @par Thread Safety
142 All public member functions are thread-safe.
143 */
144 class reactor_scheduler_base
145 : public native_scheduler
146 , public capy::execution_context::service
147 {
148 public:
149 using key_type = scheduler;
150 using context_type = reactor_scheduler_context;
151
152 /// Post a coroutine for deferred execution.
153 void post(std::coroutine_handle<> h) const override;
154
155 /// Post a scheduler operation for deferred execution.
156 void post(scheduler_op* h) const override;
157
158 /// Return true if called from a thread running this scheduler.
159 bool running_in_this_thread() const noexcept override;
160
161 /// Request the scheduler to stop dispatching handlers.
162 void stop() override;
163
164 /// Return true if the scheduler has been stopped.
165 bool stopped() const noexcept override;
166
167 /// Reset the stopped state so `run()` can resume.
168 void restart() override;
169
170 /// Run the event loop until no work remains.
171 std::size_t run() override;
172
173 /// Run until one handler completes or no work remains.
174 std::size_t run_one() override;
175
176 /// Run until one handler completes or @a usec elapses.
177 std::size_t wait_one(long usec) override;
178
179 /// Run ready handlers without blocking.
180 std::size_t poll() override;
181
182 /// Run at most one ready handler without blocking.
183 std::size_t poll_one() override;
184
185 /// Increment the outstanding work count.
186 void work_started() noexcept override;
187
188 /// Decrement the outstanding work count, stopping on zero.
189 void work_finished() noexcept override;
190
191 /** Reset the thread's inline completion budget.
192
193 Called at the start of each posted completion handler to
194 grant a fresh budget for speculative inline completions.
195 */
196 void reset_inline_budget() const noexcept;
197
198 /** Consume one unit of inline budget if available.
199
200 @return True if budget was available and consumed.
201 */
202 bool try_consume_inline_budget() const noexcept;
203
204 /** Offset a forthcoming work_finished from work_cleanup.
205
206 Called by descriptor_state when all I/O returned EAGAIN and
207 no handler will be executed. Must be called from a scheduler
208 thread.
209 */
210 void compensating_work_started() const noexcept;
211
212 /** Drain work from thread context's private queue to global queue.
213
214 Flushes private work count to the global counter, then
215 transfers the queue under mutex protection.
216
217 @param queue The private queue to drain.
218 @param count Private work count to flush before draining.
219 */
220 void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221
222 /** Post completed operations for deferred invocation.
223
224 If called from a thread running this scheduler, operations
225 go to the thread's private queue (fast path). Otherwise,
226 operations are added to the global queue under mutex and a
227 waiter is signaled.
228
229 @par Preconditions
230 work_started() must have been called for each operation.
231
232 @param ops Queue of operations to post.
233 */
234 void post_deferred_completions(op_queue& ops) const;
235
236 protected:
237 412x reactor_scheduler_base() = default;
238
239 /** Drain completed_ops during shutdown.
240
241 Pops all operations from the global queue and destroys them,
242 skipping the task sentinel. Signals all waiting threads.
243 Derived classes call this from their shutdown() override
244 before performing platform-specific cleanup.
245 */
246 void shutdown_drain();
247
248 /// RAII guard that re-inserts the task sentinel after `run_task`.
249 struct task_cleanup
250 {
251 reactor_scheduler_base const* sched;
252 std::unique_lock<std::mutex>* lock;
253 context_type* ctx;
254 ~task_cleanup();
255 };
256
257 mutable std::mutex mutex_;
258 mutable std::condition_variable cond_;
259 mutable op_queue completed_ops_;
260 mutable std::atomic<std::int64_t> outstanding_work_{0};
261 bool stopped_ = false;
262 mutable std::atomic<bool> task_running_{false};
263 mutable bool task_interrupted_ = false;
264
265 /// Bit 0 of `state_`: set when the condvar should be signaled.
266 static constexpr std::size_t signaled_bit = 1;
267
268 /// Increment per waiting thread in `state_`.
269 static constexpr std::size_t waiter_increment = 2;
270 mutable std::size_t state_ = 0;
271
272 /// Sentinel op that triggers a reactor poll when dequeued.
273 struct task_op final : scheduler_op
274 {
275 void operator()() override {}
276 void destroy() override {}
277 };
278 task_op task_op_;
279
280 /// Run the platform-specific reactor poll.
281 virtual void
282 run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
283
284 /// Wake a blocked reactor (e.g. write to eventfd or pipe).
285 virtual void interrupt_reactor() const = 0;
286
287 private:
288 struct work_cleanup
289 {
290 reactor_scheduler_base* sched;
291 std::unique_lock<std::mutex>* lock;
292 context_type* ctx;
293 ~work_cleanup();
294 };
295
296 std::size_t do_one(
297 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
298
299 void signal_all(std::unique_lock<std::mutex>& lock) const;
300 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
301 bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 void clear_signal() const;
303 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
304 void wait_for_signal_for(
305 std::unique_lock<std::mutex>& lock, long timeout_us) const;
306 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
307 };
308
309 /** RAII guard that pushes/pops a scheduler context frame.
310
311 On construction, pushes a new context frame onto the
312 thread-local stack. On destruction, drains any remaining
313 private queue items to the global queue and pops the frame.
314 */
315 struct reactor_thread_context_guard
316 {
317 /// The context frame managed by this guard.
318 reactor_scheduler_context frame_;
319
320 /// Construct the guard, pushing a frame for @a sched.
321 363x explicit reactor_thread_context_guard(
322 reactor_scheduler_base const* sched) noexcept
323 363x : frame_(sched, reactor_context_stack.get())
324 {
325 363x reactor_context_stack.set(&frame_);
326 363x }
327
328 /// Destroy the guard, draining private work and popping the frame.
329 363x ~reactor_thread_context_guard() noexcept
330 {
331 363x if (!frame_.private_queue.empty())
332 frame_.key->drain_thread_queue(
333 frame_.private_queue, frame_.private_outstanding_work);
334 363x reactor_context_stack.set(frame_.next);
335 363x }
336 };
337
338 // ---- Inline implementations ------------------------------------------------
339
340 inline void
341 116654x reactor_scheduler_base::reset_inline_budget() const noexcept
342 {
343 116654x if (auto* ctx = reactor_find_context(this))
344 {
345 // Cap when no other thread absorbed queued work
346 116654x if (ctx->unassisted)
347 {
348 116654x ctx->inline_budget_max = 4;
349 116654x ctx->inline_budget = 4;
350 116654x return;
351 }
352 // Ramp up when previous cycle fully consumed budget
353 if (ctx->inline_budget == 0)
354 ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
355 else if (ctx->inline_budget < ctx->inline_budget_max)
356 ctx->inline_budget_max = 2;
357 ctx->inline_budget = ctx->inline_budget_max;
358 }
359 }
360
361 inline bool
362 506663x reactor_scheduler_base::try_consume_inline_budget() const noexcept
363 {
364 506663x if (auto* ctx = reactor_find_context(this))
365 {
366 506663x if (ctx->inline_budget > 0)
367 {
368 405381x --ctx->inline_budget;
369 405381x return true;
370 }
371 }
372 101282x return false;
373 }
374
375 inline void
376 10133x reactor_scheduler_base::post(std::coroutine_handle<> h) const
377 {
378 struct post_handler final : scheduler_op
379 {
380 std::coroutine_handle<> h_;
381
382 10133x explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
383 20266x ~post_handler() override = default;
384
385 10124x void operator()() override
386 {
387 10124x auto saved = h_;
388 10124x delete this;
389 // Ensure stores from the posting thread are visible
390 std::atomic_thread_fence(std::memory_order_acquire);
391 10124x saved.resume();
392 10124x }
393
394 9x void destroy() override
395 {
396 9x auto saved = h_;
397 9x delete this;
398 9x saved.destroy();
399 9x }
400 };
401
402 10133x auto ph = std::make_unique<post_handler>(h);
403
404 10133x if (auto* ctx = reactor_find_context(this))
405 {
406 8195x ++ctx->private_outstanding_work;
407 8195x ctx->private_queue.push(ph.release());
408 8195x return;
409 }
410
411 1938x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
412
413 1938x std::unique_lock lock(mutex_);
414 1938x completed_ops_.push(ph.release());
415 1938x wake_one_thread_and_unlock(lock);
416 10133x }
417
418 inline void
419 109609x reactor_scheduler_base::post(scheduler_op* h) const
420 {
421 109609x if (auto* ctx = reactor_find_context(this))
422 {
423 109575x ++ctx->private_outstanding_work;
424 109575x ctx->private_queue.push(h);
425 109575x return;
426 }
427
428 34x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
429
430 34x std::unique_lock lock(mutex_);
431 34x completed_ops_.push(h);
432 34x wake_one_thread_and_unlock(lock);
433 34x }
434
435 inline bool
436 1118x reactor_scheduler_base::running_in_this_thread() const noexcept
437 {
438 1118x return reactor_find_context(this) != nullptr;
439 }
440
441 inline void
442 353x reactor_scheduler_base::stop()
443 {
444 353x std::unique_lock lock(mutex_);
445 353x if (!stopped_)
446 {
447 317x stopped_ = true;
448 317x signal_all(lock);
449 317x interrupt_reactor();
450 }
451 353x }
452
453 inline bool
454 21x reactor_scheduler_base::stopped() const noexcept
455 {
456 21x std::unique_lock lock(mutex_);
457 42x return stopped_;
458 21x }
459
460 inline void
461 91x reactor_scheduler_base::restart()
462 {
463 91x std::unique_lock lock(mutex_);
464 91x stopped_ = false;
465 91x }
466
467 inline std::size_t
468 331x reactor_scheduler_base::run()
469 {
470 662x if (outstanding_work_.load(std::memory_order_acquire) == 0)
471 {
472 28x stop();
473 28x return 0;
474 }
475
476 303x reactor_thread_context_guard ctx(this);
477 303x std::unique_lock lock(mutex_);
478
479 303x std::size_t n = 0;
480 for (;;)
481 {
482 305727x if (!do_one(lock, -1, &ctx.frame_))
483 303x break;
484 305424x if (n != (std::numeric_limits<std::size_t>::max)())
485 305424x ++n;
486 305424x if (!lock.owns_lock())
487 195594x lock.lock();
488 }
489 303x return n;
490 303x }
491
492 inline std::size_t
493 2x reactor_scheduler_base::run_one()
494 {
495 4x if (outstanding_work_.load(std::memory_order_acquire) == 0)
496 {
497 stop();
498 return 0;
499 }
500
501 2x reactor_thread_context_guard ctx(this);
502 2x std::unique_lock lock(mutex_);
503 2x return do_one(lock, -1, &ctx.frame_);
504 2x }
505
506 inline std::size_t
507 61x reactor_scheduler_base::wait_one(long usec)
508 {
509 122x if (outstanding_work_.load(std::memory_order_acquire) == 0)
510 {
511 10x stop();
512 10x return 0;
513 }
514
515 51x reactor_thread_context_guard ctx(this);
516 51x std::unique_lock lock(mutex_);
517 51x return do_one(lock, usec, &ctx.frame_);
518 51x }
519
520 inline std::size_t
521 6x reactor_scheduler_base::poll()
522 {
523 12x if (outstanding_work_.load(std::memory_order_acquire) == 0)
524 {
525 1x stop();
526 1x return 0;
527 }
528
529 5x reactor_thread_context_guard ctx(this);
530 5x std::unique_lock lock(mutex_);
531
532 5x std::size_t n = 0;
533 for (;;)
534 {
535 11x if (!do_one(lock, 0, &ctx.frame_))
536 5x break;
537 6x if (n != (std::numeric_limits<std::size_t>::max)())
538 6x ++n;
539 6x if (!lock.owns_lock())
540 6x lock.lock();
541 }
542 5x return n;
543 5x }
544
545 inline std::size_t
546 4x reactor_scheduler_base::poll_one()
547 {
548 8x if (outstanding_work_.load(std::memory_order_acquire) == 0)
549 {
550 2x stop();
551 2x return 0;
552 }
553
554 2x reactor_thread_context_guard ctx(this);
555 2x std::unique_lock lock(mutex_);
556 2x return do_one(lock, 0, &ctx.frame_);
557 2x }
558
559 inline void
560 24478x reactor_scheduler_base::work_started() noexcept
561 {
562 24478x outstanding_work_.fetch_add(1, std::memory_order_relaxed);
563 24478x }
564
565 inline void
566 34351x reactor_scheduler_base::work_finished() noexcept
567 {
568 68702x if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
569 309x stop();
570 34351x }
571
572 inline void
573 170698x reactor_scheduler_base::compensating_work_started() const noexcept
574 {
575 170698x auto* ctx = reactor_find_context(this);
576 170698x if (ctx)
577 170698x ++ctx->private_outstanding_work;
578 170698x }
579
580 inline void
581 reactor_scheduler_base::drain_thread_queue(
582 op_queue& queue, std::int64_t count) const
583 {
584 if (count > 0)
585 outstanding_work_.fetch_add(count, std::memory_order_relaxed);
586
587 std::unique_lock lock(mutex_);
588 completed_ops_.splice(queue);
589 if (count > 0)
590 maybe_unlock_and_signal_one(lock);
591 }
592
593 inline void
594 15062x reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
595 {
596 15062x if (ops.empty())
597 15062x return;
598
599 if (auto* ctx = reactor_find_context(this))
600 {
601 ctx->private_queue.splice(ops);
602 return;
603 }
604
605 std::unique_lock lock(mutex_);
606 completed_ops_.splice(ops);
607 wake_one_thread_and_unlock(lock);
608 }
609
610 inline void
611 412x reactor_scheduler_base::shutdown_drain()
612 {
613 412x std::unique_lock lock(mutex_);
614
615 893x while (auto* h = completed_ops_.pop())
616 {
617 481x if (h == &task_op_)
618 412x continue;
619 69x lock.unlock();
620 69x h->destroy();
621 69x lock.lock();
622 481x }
623
624 412x signal_all(lock);
625 412x }
626
627 inline void
628 729x reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
629 {
630 729x state_ |= signaled_bit;
631 729x cond_.notify_all();
632 729x }
633
634 inline bool
635 1972x reactor_scheduler_base::maybe_unlock_and_signal_one(
636 std::unique_lock<std::mutex>& lock) const
637 {
638 1972x state_ |= signaled_bit;
639 1972x if (state_ > signaled_bit)
640 {
641 lock.unlock();
642 cond_.notify_one();
643 return true;
644 }
645 1972x return false;
646 }
647
648 inline bool
649 357914x reactor_scheduler_base::unlock_and_signal_one(
650 std::unique_lock<std::mutex>& lock) const
651 {
652 357914x state_ |= signaled_bit;
653 357914x bool have_waiters = state_ > signaled_bit;
654 357914x lock.unlock();
655 357914x if (have_waiters)
656 cond_.notify_one();
657 357914x return have_waiters;
658 }
659
660 inline void
661 reactor_scheduler_base::clear_signal() const
662 {
663 state_ &= ~signaled_bit;
664 }
665
666 inline void
667 reactor_scheduler_base::wait_for_signal(
668 std::unique_lock<std::mutex>& lock) const
669 {
670 while ((state_ & signaled_bit) == 0)
671 {
672 state_ += waiter_increment;
673 cond_.wait(lock);
674 state_ -= waiter_increment;
675 }
676 }
677
678 inline void
679 reactor_scheduler_base::wait_for_signal_for(
680 std::unique_lock<std::mutex>& lock, long timeout_us) const
681 {
682 if ((state_ & signaled_bit) == 0)
683 {
684 state_ += waiter_increment;
685 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
686 state_ -= waiter_increment;
687 }
688 }
689
690 inline void
691 1972x reactor_scheduler_base::wake_one_thread_and_unlock(
692 std::unique_lock<std::mutex>& lock) const
693 {
694 1972x if (maybe_unlock_and_signal_one(lock))
695 return;
696
697 1972x if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
698 {
699 26x task_interrupted_ = true;
700 26x lock.unlock();
701 26x interrupt_reactor();
702 }
703 else
704 {
705 1946x lock.unlock();
706 }
707 }
708
709 305485x inline reactor_scheduler_base::work_cleanup::~work_cleanup()
710 {
711 305485x if (ctx)
712 {
713 305485x std::int64_t produced = ctx->private_outstanding_work;
714 305485x if (produced > 1)
715 15x sched->outstanding_work_.fetch_add(
716 produced - 1, std::memory_order_relaxed);
717 305470x else if (produced < 1)
718 24935x sched->work_finished();
719 305485x ctx->private_outstanding_work = 0;
720
721 305485x if (!ctx->private_queue.empty())
722 {
723 109852x lock->lock();
724 109852x sched->completed_ops_.splice(ctx->private_queue);
725 }
726 }
727 else
728 {
729 sched->work_finished();
730 }
731 305485x }
732
733 403940x inline reactor_scheduler_base::task_cleanup::~task_cleanup()
734 {
735 201970x if (!ctx)
736 return;
737
738 201970x if (ctx->private_outstanding_work > 0)
739 {
740 7892x sched->outstanding_work_.fetch_add(
741 7892x ctx->private_outstanding_work, std::memory_order_relaxed);
742 7892x ctx->private_outstanding_work = 0;
743 }
744
745 201970x if (!ctx->private_queue.empty())
746 {
747 7892x if (!lock->owns_lock())
748 lock->lock();
749 7892x sched->completed_ops_.splice(ctx->private_queue);
750 }
751 201970x }
752
753 inline std::size_t
754 305793x reactor_scheduler_base::do_one(
755 std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
756 {
757 for (;;)
758 {
759 507763x if (stopped_)
760 302x return 0;
761
762 507461x scheduler_op* op = completed_ops_.pop();
763
764 // Handle reactor sentinel — time to poll for I/O
765 507461x if (op == &task_op_)
766 {
767 bool more_handlers =
768 201976x !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
769
770 351523x if (!more_handlers &&
771 299094x (outstanding_work_.load(std::memory_order_acquire) == 0 ||
772 timeout_us == 0))
773 {
774 6x completed_ops_.push(&task_op_);
775 6x return 0;
776 }
777
778 201970x task_interrupted_ = more_handlers || timeout_us == 0;
779 201970x task_running_.store(true, std::memory_order_release);
780
781 201970x if (more_handlers)
782 52429x unlock_and_signal_one(lock);
783
784 try
785 {
786 201970x run_task(lock, ctx);
787 }
788 catch (...)
789 {
790 task_running_.store(false, std::memory_order_relaxed);
791 throw;
792 }
793
794 201970x task_running_.store(false, std::memory_order_relaxed);
795 201970x completed_ops_.push(&task_op_);
796 201970x continue;
797 201970x }
798
799 // Handle operation
800 305485x if (op != nullptr)
801 {
802 305485x bool more = !completed_ops_.empty();
803
804 305485x if (more)
805 305485x ctx->unassisted = !unlock_and_signal_one(lock);
806 else
807 {
808 ctx->unassisted = false;
809 lock.unlock();
810 }
811
812 305485x work_cleanup on_exit{this, &lock, ctx};
813 (void)on_exit;
814
815 305485x (*op)();
816 305485x return 1;
817 305485x }
818
819 // Try private queue before blocking
820 if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
821 continue;
822
823 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
824 timeout_us == 0)
825 return 0;
826
827 clear_signal();
828 if (timeout_us < 0)
829 wait_for_signal(lock);
830 else
831 wait_for_signal_for(lock, timeout_us);
832 201970x }
833 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
838