TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/capy/ex/execution_context.hpp>
15 :
16 : #include <boost/corosio/native/native_scheduler.hpp>
17 : #include <boost/corosio/detail/scheduler_op.hpp>
18 : #include <boost/corosio/detail/thread_local_ptr.hpp>
19 :
20 : #include <atomic>
21 : #include <chrono>
22 : #include <condition_variable>
23 : #include <coroutine>
24 : #include <cstddef>
25 : #include <cstdint>
26 : #include <limits>
27 : #include <memory>
28 : #include <mutex>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : // Forward declaration
33 : class reactor_scheduler_base;
34 :
35 : /** Per-thread state for a reactor scheduler.
36 :
37 : Each thread running a scheduler's event loop has one of these
38 : on a thread-local stack. It holds a private work queue and
39 : inline completion budget for speculative I/O fast paths.
40 : */
41 : struct BOOST_COROSIO_SYMBOL_VISIBLE reactor_scheduler_context
42 : {
43 : /// Scheduler this context belongs to.
44 : reactor_scheduler_base const* key;
45 :
46 : /// Next context frame on this thread's stack.
47 : reactor_scheduler_context* next;
48 :
49 : /// Private work queue for reduced contention.
50 : op_queue private_queue;
51 :
52 : /// Unflushed work count for the private queue.
53 : std::int64_t private_outstanding_work;
54 :
55 : /// Remaining inline completions allowed this cycle.
56 : int inline_budget;
57 :
58 : /// Maximum inline budget (adaptive, 2-16).
59 : int inline_budget_max;
60 :
61 : /// True if no other thread absorbed queued work last cycle.
62 : bool unassisted;
63 :
64 : /// Construct a context frame linked to @a n.
65 HIT 363 : reactor_scheduler_context(
66 : reactor_scheduler_base const* k, reactor_scheduler_context* n)
67 363 : : key(k)
68 363 : , next(n)
69 363 : , private_outstanding_work(0)
70 363 : , inline_budget(0)
71 363 : , inline_budget_max(2)
72 363 : , unassisted(false)
73 : {
74 363 : }
75 : };
76 :
77 : /// Thread-local context stack for reactor schedulers.
78 : inline thread_local_ptr<reactor_scheduler_context> reactor_context_stack;
79 :
80 : /// Find the context frame for a scheduler on this thread.
81 : inline reactor_scheduler_context*
82 914875 : reactor_find_context(reactor_scheduler_base const* self) noexcept
83 : {
84 914875 : for (auto* c = reactor_context_stack.get(); c != nullptr; c = c->next)
85 : {
86 912387 : if (c->key == self)
87 912387 : return c;
88 : }
89 2488 : return nullptr;
90 : }
91 :
92 : /// Flush private work count to global counter.
93 : inline void
94 MIS 0 : reactor_flush_private_work(
95 : reactor_scheduler_context* ctx,
96 : std::atomic<std::int64_t>& outstanding_work) noexcept
97 : {
98 0 : if (ctx && ctx->private_outstanding_work > 0)
99 : {
100 0 : outstanding_work.fetch_add(
101 : ctx->private_outstanding_work, std::memory_order_relaxed);
102 0 : ctx->private_outstanding_work = 0;
103 : }
104 0 : }
105 :
106 : /** Drain private queue to global queue, flushing work count first.
107 :
108 : @return True if any ops were drained.
109 : */
110 : inline bool
111 0 : reactor_drain_private_queue(
112 : reactor_scheduler_context* ctx,
113 : std::atomic<std::int64_t>& outstanding_work,
114 : op_queue& completed_ops) noexcept
115 : {
116 0 : if (!ctx || ctx->private_queue.empty())
117 0 : return false;
118 :
119 0 : reactor_flush_private_work(ctx, outstanding_work);
120 0 : completed_ops.splice(ctx->private_queue);
121 0 : return true;
122 : }
123 :
124 : /** Non-template base for reactor-backed scheduler implementations.
125 :
126 : Provides the complete threading model shared by epoll, kqueue,
127 : and select schedulers: signal state machine, inline completion
128 : budget, work counting, run/poll methods, and the do_one event
129 : loop.
130 :
131 : Derived classes provide platform-specific hooks by overriding:
132 : - `run_task(lock, ctx)` to run the reactor poll
133 : - `interrupt_reactor()` to wake a blocked reactor
134 :
135 : De-templated from the original CRTP design to eliminate
136 : duplicate instantiations when multiple backends are compiled
137 : into the same binary. Virtual dispatch for run_task (called
138 : once per reactor cycle, before a blocking syscall) has
139 : negligible overhead.
140 :
141 : @par Thread Safety
142 : All public member functions are thread-safe.
143 : */
144 : class reactor_scheduler_base
145 : : public native_scheduler
146 : , public capy::execution_context::service
147 : {
148 : public:
149 : using key_type = scheduler;
150 : using context_type = reactor_scheduler_context;
151 :
152 : /// Post a coroutine for deferred execution.
153 : void post(std::coroutine_handle<> h) const override;
154 :
155 : /// Post a scheduler operation for deferred execution.
156 : void post(scheduler_op* h) const override;
157 :
158 : /// Return true if called from a thread running this scheduler.
159 : bool running_in_this_thread() const noexcept override;
160 :
161 : /// Request the scheduler to stop dispatching handlers.
162 : void stop() override;
163 :
164 : /// Return true if the scheduler has been stopped.
165 : bool stopped() const noexcept override;
166 :
167 : /// Reset the stopped state so `run()` can resume.
168 : void restart() override;
169 :
170 : /// Run the event loop until no work remains.
171 : std::size_t run() override;
172 :
173 : /// Run until one handler completes or no work remains.
174 : std::size_t run_one() override;
175 :
176 : /// Run until one handler completes or @a usec elapses.
177 : std::size_t wait_one(long usec) override;
178 :
179 : /// Run ready handlers without blocking.
180 : std::size_t poll() override;
181 :
182 : /// Run at most one ready handler without blocking.
183 : std::size_t poll_one() override;
184 :
185 : /// Increment the outstanding work count.
186 : void work_started() noexcept override;
187 :
188 : /// Decrement the outstanding work count, stopping on zero.
189 : void work_finished() noexcept override;
190 :
191 : /** Reset the thread's inline completion budget.
192 :
193 : Called at the start of each posted completion handler to
194 : grant a fresh budget for speculative inline completions.
195 : */
196 : void reset_inline_budget() const noexcept;
197 :
198 : /** Consume one unit of inline budget if available.
199 :
200 : @return True if budget was available and consumed.
201 : */
202 : bool try_consume_inline_budget() const noexcept;
203 :
204 : /** Offset a forthcoming work_finished from work_cleanup.
205 :
206 : Called by descriptor_state when all I/O returned EAGAIN and
207 : no handler will be executed. Must be called from a scheduler
208 : thread.
209 : */
210 : void compensating_work_started() const noexcept;
211 :
212 : /** Drain work from thread context's private queue to global queue.
213 :
214 : Flushes private work count to the global counter, then
215 : transfers the queue under mutex protection.
216 :
217 : @param queue The private queue to drain.
218 : @param count Private work count to flush before draining.
219 : */
220 : void drain_thread_queue(op_queue& queue, std::int64_t count) const;
221 :
222 : /** Post completed operations for deferred invocation.
223 :
224 : If called from a thread running this scheduler, operations
225 : go to the thread's private queue (fast path). Otherwise,
226 : operations are added to the global queue under mutex and a
227 : waiter is signaled.
228 :
229 : @par Preconditions
230 : work_started() must have been called for each operation.
231 :
232 : @param ops Queue of operations to post.
233 : */
234 : void post_deferred_completions(op_queue& ops) const;
235 :
236 : protected:
237 HIT 412 : reactor_scheduler_base() = default;
238 :
239 : /** Drain completed_ops during shutdown.
240 :
241 : Pops all operations from the global queue and destroys them,
242 : skipping the task sentinel. Signals all waiting threads.
243 : Derived classes call this from their shutdown() override
244 : before performing platform-specific cleanup.
245 : */
246 : void shutdown_drain();
247 :
248 : /// RAII guard that re-inserts the task sentinel after `run_task`.
249 : struct task_cleanup
250 : {
251 : reactor_scheduler_base const* sched;
252 : std::unique_lock<std::mutex>* lock;
253 : context_type* ctx;
254 : ~task_cleanup();
255 : };
256 :
257 : mutable std::mutex mutex_;
258 : mutable std::condition_variable cond_;
259 : mutable op_queue completed_ops_;
260 : mutable std::atomic<std::int64_t> outstanding_work_{0};
261 : bool stopped_ = false;
262 : mutable std::atomic<bool> task_running_{false};
263 : mutable bool task_interrupted_ = false;
264 :
265 : /// Bit 0 of `state_`: set when the condvar should be signaled.
266 : static constexpr std::size_t signaled_bit = 1;
267 :
268 : /// Increment per waiting thread in `state_`.
269 : static constexpr std::size_t waiter_increment = 2;
270 : mutable std::size_t state_ = 0;
271 :
272 : /// Sentinel op that triggers a reactor poll when dequeued.
273 : struct task_op final : scheduler_op
274 : {
275 MIS 0 : void operator()() override {}
276 0 : void destroy() override {}
277 : };
278 : task_op task_op_;
279 :
280 : /// Run the platform-specific reactor poll.
281 : virtual void
282 : run_task(std::unique_lock<std::mutex>& lock, context_type* ctx) = 0;
283 :
284 : /// Wake a blocked reactor (e.g. write to eventfd or pipe).
285 : virtual void interrupt_reactor() const = 0;
286 :
287 : private:
288 : struct work_cleanup
289 : {
290 : reactor_scheduler_base* sched;
291 : std::unique_lock<std::mutex>* lock;
292 : context_type* ctx;
293 : ~work_cleanup();
294 : };
295 :
296 : std::size_t do_one(
297 : std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx);
298 :
299 : void signal_all(std::unique_lock<std::mutex>& lock) const;
300 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
301 : bool unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
302 : void clear_signal() const;
303 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
304 : void wait_for_signal_for(
305 : std::unique_lock<std::mutex>& lock, long timeout_us) const;
306 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
307 : };
308 :
309 : /** RAII guard that pushes/pops a scheduler context frame.
310 :
311 : On construction, pushes a new context frame onto the
312 : thread-local stack. On destruction, drains any remaining
313 : private queue items to the global queue and pops the frame.
314 : */
315 : struct reactor_thread_context_guard
316 : {
317 : /// The context frame managed by this guard.
318 : reactor_scheduler_context frame_;
319 :
320 : /// Construct the guard, pushing a frame for @a sched.
321 HIT 363 : explicit reactor_thread_context_guard(
322 : reactor_scheduler_base const* sched) noexcept
323 363 : : frame_(sched, reactor_context_stack.get())
324 : {
325 363 : reactor_context_stack.set(&frame_);
326 363 : }
327 :
328 : /// Destroy the guard, draining private work and popping the frame.
329 363 : ~reactor_thread_context_guard() noexcept
330 : {
331 363 : if (!frame_.private_queue.empty())
332 MIS 0 : frame_.key->drain_thread_queue(
333 0 : frame_.private_queue, frame_.private_outstanding_work);
334 HIT 363 : reactor_context_stack.set(frame_.next);
335 363 : }
336 : };
337 :
338 : // ---- Inline implementations ------------------------------------------------
339 :
340 : inline void
341 116654 : reactor_scheduler_base::reset_inline_budget() const noexcept
342 : {
343 116654 : if (auto* ctx = reactor_find_context(this))
344 : {
345 : // Cap when no other thread absorbed queued work
346 116654 : if (ctx->unassisted)
347 : {
348 116654 : ctx->inline_budget_max = 4;
349 116654 : ctx->inline_budget = 4;
350 116654 : return;
351 : }
352 : // Ramp up when previous cycle fully consumed budget
353 MIS 0 : if (ctx->inline_budget == 0)
354 0 : ctx->inline_budget_max = (std::min)(ctx->inline_budget_max * 2, 16);
355 0 : else if (ctx->inline_budget < ctx->inline_budget_max)
356 0 : ctx->inline_budget_max = 2;
357 0 : ctx->inline_budget = ctx->inline_budget_max;
358 : }
359 : }
360 :
361 : inline bool
362 HIT 506663 : reactor_scheduler_base::try_consume_inline_budget() const noexcept
363 : {
364 506663 : if (auto* ctx = reactor_find_context(this))
365 : {
366 506663 : if (ctx->inline_budget > 0)
367 : {
368 405381 : --ctx->inline_budget;
369 405381 : return true;
370 : }
371 : }
372 101282 : return false;
373 : }
374 :
375 : inline void
376 10133 : reactor_scheduler_base::post(std::coroutine_handle<> h) const
377 : {
378 : struct post_handler final : scheduler_op
379 : {
380 : std::coroutine_handle<> h_;
381 :
382 10133 : explicit post_handler(std::coroutine_handle<> h) : h_(h) {}
383 20266 : ~post_handler() override = default;
384 :
385 10124 : void operator()() override
386 : {
387 10124 : auto saved = h_;
388 10124 : delete this;
389 : // Ensure stores from the posting thread are visible
390 : std::atomic_thread_fence(std::memory_order_acquire);
391 10124 : saved.resume();
392 10124 : }
393 :
394 9 : void destroy() override
395 : {
396 9 : auto saved = h_;
397 9 : delete this;
398 9 : saved.destroy();
399 9 : }
400 : };
401 :
402 10133 : auto ph = std::make_unique<post_handler>(h);
403 :
404 10133 : if (auto* ctx = reactor_find_context(this))
405 : {
406 8195 : ++ctx->private_outstanding_work;
407 8195 : ctx->private_queue.push(ph.release());
408 8195 : return;
409 : }
410 :
411 1938 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
412 :
413 1938 : std::unique_lock lock(mutex_);
414 1938 : completed_ops_.push(ph.release());
415 1938 : wake_one_thread_and_unlock(lock);
416 10133 : }
417 :
418 : inline void
419 109609 : reactor_scheduler_base::post(scheduler_op* h) const
420 : {
421 109609 : if (auto* ctx = reactor_find_context(this))
422 : {
423 109575 : ++ctx->private_outstanding_work;
424 109575 : ctx->private_queue.push(h);
425 109575 : return;
426 : }
427 :
428 34 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
429 :
430 34 : std::unique_lock lock(mutex_);
431 34 : completed_ops_.push(h);
432 34 : wake_one_thread_and_unlock(lock);
433 34 : }
434 :
435 : inline bool
436 1118 : reactor_scheduler_base::running_in_this_thread() const noexcept
437 : {
438 1118 : return reactor_find_context(this) != nullptr;
439 : }
440 :
441 : inline void
442 353 : reactor_scheduler_base::stop()
443 : {
444 353 : std::unique_lock lock(mutex_);
445 353 : if (!stopped_)
446 : {
447 317 : stopped_ = true;
448 317 : signal_all(lock);
449 317 : interrupt_reactor();
450 : }
451 353 : }
452 :
453 : inline bool
454 21 : reactor_scheduler_base::stopped() const noexcept
455 : {
456 21 : std::unique_lock lock(mutex_);
457 42 : return stopped_;
458 21 : }
459 :
460 : inline void
461 91 : reactor_scheduler_base::restart()
462 : {
463 91 : std::unique_lock lock(mutex_);
464 91 : stopped_ = false;
465 91 : }
466 :
467 : inline std::size_t
468 331 : reactor_scheduler_base::run()
469 : {
470 662 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
471 : {
472 28 : stop();
473 28 : return 0;
474 : }
475 :
476 303 : reactor_thread_context_guard ctx(this);
477 303 : std::unique_lock lock(mutex_);
478 :
479 303 : std::size_t n = 0;
480 : for (;;)
481 : {
482 305727 : if (!do_one(lock, -1, &ctx.frame_))
483 303 : break;
484 305424 : if (n != (std::numeric_limits<std::size_t>::max)())
485 305424 : ++n;
486 305424 : if (!lock.owns_lock())
487 195594 : lock.lock();
488 : }
489 303 : return n;
490 303 : }
491 :
492 : inline std::size_t
493 2 : reactor_scheduler_base::run_one()
494 : {
495 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
496 : {
497 MIS 0 : stop();
498 0 : return 0;
499 : }
500 :
501 HIT 2 : reactor_thread_context_guard ctx(this);
502 2 : std::unique_lock lock(mutex_);
503 2 : return do_one(lock, -1, &ctx.frame_);
504 2 : }
505 :
506 : inline std::size_t
507 61 : reactor_scheduler_base::wait_one(long usec)
508 : {
509 122 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
510 : {
511 10 : stop();
512 10 : return 0;
513 : }
514 :
515 51 : reactor_thread_context_guard ctx(this);
516 51 : std::unique_lock lock(mutex_);
517 51 : return do_one(lock, usec, &ctx.frame_);
518 51 : }
519 :
520 : inline std::size_t
521 6 : reactor_scheduler_base::poll()
522 : {
523 12 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
524 : {
525 1 : stop();
526 1 : return 0;
527 : }
528 :
529 5 : reactor_thread_context_guard ctx(this);
530 5 : std::unique_lock lock(mutex_);
531 :
532 5 : std::size_t n = 0;
533 : for (;;)
534 : {
535 11 : if (!do_one(lock, 0, &ctx.frame_))
536 5 : break;
537 6 : if (n != (std::numeric_limits<std::size_t>::max)())
538 6 : ++n;
539 6 : if (!lock.owns_lock())
540 6 : lock.lock();
541 : }
542 5 : return n;
543 5 : }
544 :
545 : inline std::size_t
546 4 : reactor_scheduler_base::poll_one()
547 : {
548 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
549 : {
550 2 : stop();
551 2 : return 0;
552 : }
553 :
554 2 : reactor_thread_context_guard ctx(this);
555 2 : std::unique_lock lock(mutex_);
556 2 : return do_one(lock, 0, &ctx.frame_);
557 2 : }
558 :
559 : inline void
560 24478 : reactor_scheduler_base::work_started() noexcept
561 : {
562 24478 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
563 24478 : }
564 :
565 : inline void
566 34351 : reactor_scheduler_base::work_finished() noexcept
567 : {
568 68702 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
569 309 : stop();
570 34351 : }
571 :
572 : inline void
573 170698 : reactor_scheduler_base::compensating_work_started() const noexcept
574 : {
575 170698 : auto* ctx = reactor_find_context(this);
576 170698 : if (ctx)
577 170698 : ++ctx->private_outstanding_work;
578 170698 : }
579 :
580 : inline void
581 MIS 0 : reactor_scheduler_base::drain_thread_queue(
582 : op_queue& queue, std::int64_t count) const
583 : {
584 0 : if (count > 0)
585 0 : outstanding_work_.fetch_add(count, std::memory_order_relaxed);
586 :
587 0 : std::unique_lock lock(mutex_);
588 0 : completed_ops_.splice(queue);
589 0 : if (count > 0)
590 0 : maybe_unlock_and_signal_one(lock);
591 0 : }
592 :
593 : inline void
594 HIT 15062 : reactor_scheduler_base::post_deferred_completions(op_queue& ops) const
595 : {
596 15062 : if (ops.empty())
597 15062 : return;
598 :
599 MIS 0 : if (auto* ctx = reactor_find_context(this))
600 : {
601 0 : ctx->private_queue.splice(ops);
602 0 : return;
603 : }
604 :
605 0 : std::unique_lock lock(mutex_);
606 0 : completed_ops_.splice(ops);
607 0 : wake_one_thread_and_unlock(lock);
608 0 : }
609 :
610 : inline void
611 HIT 412 : reactor_scheduler_base::shutdown_drain()
612 : {
613 412 : std::unique_lock lock(mutex_);
614 :
615 893 : while (auto* h = completed_ops_.pop())
616 : {
617 481 : if (h == &task_op_)
618 412 : continue;
619 69 : lock.unlock();
620 69 : h->destroy();
621 69 : lock.lock();
622 481 : }
623 :
624 412 : signal_all(lock);
625 412 : }
626 :
627 : inline void
628 729 : reactor_scheduler_base::signal_all(std::unique_lock<std::mutex>&) const
629 : {
630 729 : state_ |= signaled_bit;
631 729 : cond_.notify_all();
632 729 : }
633 :
634 : inline bool
635 1972 : reactor_scheduler_base::maybe_unlock_and_signal_one(
636 : std::unique_lock<std::mutex>& lock) const
637 : {
638 1972 : state_ |= signaled_bit;
639 1972 : if (state_ > signaled_bit)
640 : {
641 MIS 0 : lock.unlock();
642 0 : cond_.notify_one();
643 0 : return true;
644 : }
645 HIT 1972 : return false;
646 : }
647 :
648 : inline bool
649 357914 : reactor_scheduler_base::unlock_and_signal_one(
650 : std::unique_lock<std::mutex>& lock) const
651 : {
652 357914 : state_ |= signaled_bit;
653 357914 : bool have_waiters = state_ > signaled_bit;
654 357914 : lock.unlock();
655 357914 : if (have_waiters)
656 MIS 0 : cond_.notify_one();
657 HIT 357914 : return have_waiters;
658 : }
659 :
660 : inline void
661 MIS 0 : reactor_scheduler_base::clear_signal() const
662 : {
663 0 : state_ &= ~signaled_bit;
664 0 : }
665 :
666 : inline void
667 0 : reactor_scheduler_base::wait_for_signal(
668 : std::unique_lock<std::mutex>& lock) const
669 : {
670 0 : while ((state_ & signaled_bit) == 0)
671 : {
672 0 : state_ += waiter_increment;
673 0 : cond_.wait(lock);
674 0 : state_ -= waiter_increment;
675 : }
676 0 : }
677 :
678 : inline void
679 0 : reactor_scheduler_base::wait_for_signal_for(
680 : std::unique_lock<std::mutex>& lock, long timeout_us) const
681 : {
682 0 : if ((state_ & signaled_bit) == 0)
683 : {
684 0 : state_ += waiter_increment;
685 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
686 0 : state_ -= waiter_increment;
687 : }
688 0 : }
689 :
690 : inline void
691 HIT 1972 : reactor_scheduler_base::wake_one_thread_and_unlock(
692 : std::unique_lock<std::mutex>& lock) const
693 : {
694 1972 : if (maybe_unlock_and_signal_one(lock))
695 MIS 0 : return;
696 :
697 HIT 1972 : if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
698 : {
699 26 : task_interrupted_ = true;
700 26 : lock.unlock();
701 26 : interrupt_reactor();
702 : }
703 : else
704 : {
705 1946 : lock.unlock();
706 : }
707 : }
708 :
709 305485 : inline reactor_scheduler_base::work_cleanup::~work_cleanup()
710 : {
711 305485 : if (ctx)
712 : {
713 305485 : std::int64_t produced = ctx->private_outstanding_work;
714 305485 : if (produced > 1)
715 15 : sched->outstanding_work_.fetch_add(
716 : produced - 1, std::memory_order_relaxed);
717 305470 : else if (produced < 1)
718 24935 : sched->work_finished();
719 305485 : ctx->private_outstanding_work = 0;
720 :
721 305485 : if (!ctx->private_queue.empty())
722 : {
723 109852 : lock->lock();
724 109852 : sched->completed_ops_.splice(ctx->private_queue);
725 : }
726 : }
727 : else
728 : {
729 MIS 0 : sched->work_finished();
730 : }
731 HIT 305485 : }
732 :
733 403940 : inline reactor_scheduler_base::task_cleanup::~task_cleanup()
734 : {
735 201970 : if (!ctx)
736 MIS 0 : return;
737 :
738 HIT 201970 : if (ctx->private_outstanding_work > 0)
739 : {
740 7892 : sched->outstanding_work_.fetch_add(
741 7892 : ctx->private_outstanding_work, std::memory_order_relaxed);
742 7892 : ctx->private_outstanding_work = 0;
743 : }
744 :
745 201970 : if (!ctx->private_queue.empty())
746 : {
747 7892 : if (!lock->owns_lock())
748 MIS 0 : lock->lock();
749 HIT 7892 : sched->completed_ops_.splice(ctx->private_queue);
750 : }
751 201970 : }
752 :
753 : inline std::size_t
754 305793 : reactor_scheduler_base::do_one(
755 : std::unique_lock<std::mutex>& lock, long timeout_us, context_type* ctx)
756 : {
757 : for (;;)
758 : {
759 507763 : if (stopped_)
760 302 : return 0;
761 :
762 507461 : scheduler_op* op = completed_ops_.pop();
763 :
764 : // Handle reactor sentinel — time to poll for I/O
765 507461 : if (op == &task_op_)
766 : {
767 : bool more_handlers =
768 201976 : !completed_ops_.empty() || (ctx && !ctx->private_queue.empty());
769 :
770 351523 : if (!more_handlers &&
771 299094 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
772 : timeout_us == 0))
773 : {
774 6 : completed_ops_.push(&task_op_);
775 6 : return 0;
776 : }
777 :
778 201970 : task_interrupted_ = more_handlers || timeout_us == 0;
779 201970 : task_running_.store(true, std::memory_order_release);
780 :
781 201970 : if (more_handlers)
782 52429 : unlock_and_signal_one(lock);
783 :
784 : try
785 : {
786 201970 : run_task(lock, ctx);
787 : }
788 MIS 0 : catch (...)
789 : {
790 0 : task_running_.store(false, std::memory_order_relaxed);
791 0 : throw;
792 0 : }
793 :
794 HIT 201970 : task_running_.store(false, std::memory_order_relaxed);
795 201970 : completed_ops_.push(&task_op_);
796 201970 : continue;
797 201970 : }
798 :
799 : // Handle operation
800 305485 : if (op != nullptr)
801 : {
802 305485 : bool more = !completed_ops_.empty();
803 :
804 305485 : if (more)
805 305485 : ctx->unassisted = !unlock_and_signal_one(lock);
806 : else
807 : {
808 MIS 0 : ctx->unassisted = false;
809 0 : lock.unlock();
810 : }
811 :
812 HIT 305485 : work_cleanup on_exit{this, &lock, ctx};
813 : (void)on_exit;
814 :
815 305485 : (*op)();
816 305485 : return 1;
817 305485 : }
818 :
819 : // Try private queue before blocking
820 MIS 0 : if (reactor_drain_private_queue(ctx, outstanding_work_, completed_ops_))
821 0 : continue;
822 :
823 0 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
824 : timeout_us == 0)
825 0 : return 0;
826 :
827 0 : clear_signal();
828 0 : if (timeout_us < 0)
829 0 : wait_for_signal(lock);
830 : else
831 0 : wait_for_signal_for(lock, timeout_us);
832 HIT 201970 : }
833 : }
834 :
835 : } // namespace boost::corosio::detail
836 :
837 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SCHEDULER_HPP
|