TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
12 :
13 : #include <boost/corosio/tcp_socket.hpp>
14 : #include <boost/corosio/detail/intrusive.hpp>
15 : #include <boost/corosio/native/detail/reactor/reactor_op_base.hpp>
16 : #include <boost/corosio/native/detail/make_err.hpp>
17 : #include <boost/corosio/native/detail/endpoint_convert.hpp>
18 : #include <boost/corosio/detail/dispatch_coro.hpp>
19 : #include <boost/capy/buffers.hpp>
20 :
21 : #include <coroutine>
22 : #include <memory>
23 : #include <mutex>
24 : #include <utility>
25 :
26 : #include <errno.h>
27 : #include <netinet/in.h>
28 : #include <sys/socket.h>
29 : #include <sys/uio.h>
30 : #include <unistd.h>
31 :
32 : namespace boost::corosio::detail {
33 :
34 : /** CRTP base for reactor-backed socket implementations.
35 :
36 : Provides shared data members, trivial virtual overrides,
37 : non-virtual helper methods for cancellation, registration,
38 : close, and the full I/O dispatch logic (`do_connect`,
39 : `do_read_some`, `do_write_some`). Concrete backends inherit
40 : and add `cancel()`, `close_socket()`, and I/O overrides that
41 : delegate to the `do_*` helpers.
42 :
43 : @tparam Derived The concrete socket type (CRTP).
44 : @tparam Service The backend's socket service type.
45 : @tparam Op The backend's base op type.
46 : @tparam ConnOp The backend's connect op type.
47 : @tparam ReadOp The backend's read op type.
48 : @tparam WriteOp The backend's write op type.
49 : @tparam DescState The backend's descriptor_state type.
50 : */
51 : template<
52 : class Derived,
53 : class Service,
54 : class Op,
55 : class ConnOp,
56 : class ReadOp,
57 : class WriteOp,
58 : class DescState>
59 : class reactor_socket
60 : : public tcp_socket::implementation
61 : , public std::enable_shared_from_this<Derived>
62 : , public intrusive_list<Derived>::node
63 : {
64 : friend Derived;
65 :
66 HIT 22532 : explicit reactor_socket(Service& svc) noexcept : svc_(svc) {}
67 :
68 : protected:
69 : Service& svc_;
70 : int fd_ = -1;
71 : endpoint local_endpoint_;
72 : endpoint remote_endpoint_;
73 :
74 : public:
75 : /// Pending connect operation slot.
76 : ConnOp conn_;
77 :
78 : /// Pending read operation slot.
79 : ReadOp rd_;
80 :
81 : /// Pending write operation slot.
82 : WriteOp wr_;
83 :
84 : /// Per-descriptor state for persistent reactor registration.
85 : DescState desc_state_;
86 :
87 22532 : ~reactor_socket() override = default;
88 :
89 : /// Return the underlying file descriptor.
90 45509 : native_handle_type native_handle() const noexcept override
91 : {
92 45509 : return fd_;
93 : }
94 :
95 : /// Return the cached local endpoint.
96 38 : endpoint local_endpoint() const noexcept override
97 : {
98 38 : return local_endpoint_;
99 : }
100 :
101 : /// Return the cached remote endpoint.
102 42 : endpoint remote_endpoint() const noexcept override
103 : {
104 42 : return remote_endpoint_;
105 : }
106 :
107 : /// Return true if the socket has an open file descriptor.
108 : bool is_open() const noexcept
109 : {
110 : return fd_ >= 0;
111 : }
112 :
113 : /// Shut down part or all of the full-duplex connection.
114 6 : std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override
115 : {
116 : int how;
117 6 : switch (what)
118 : {
119 2 : case tcp_socket::shutdown_receive:
120 2 : how = SHUT_RD;
121 2 : break;
122 2 : case tcp_socket::shutdown_send:
123 2 : how = SHUT_WR;
124 2 : break;
125 2 : case tcp_socket::shutdown_both:
126 2 : how = SHUT_RDWR;
127 2 : break;
128 MIS 0 : default:
129 0 : return make_err(EINVAL);
130 : }
131 HIT 6 : if (::shutdown(fd_, how) != 0)
132 MIS 0 : return make_err(errno);
133 HIT 6 : return {};
134 : }
135 :
136 : /// Set a socket option.
137 60 : std::error_code set_option(
138 : int level,
139 : int optname,
140 : void const* data,
141 : std::size_t size) noexcept override
142 : {
143 60 : if (::setsockopt(
144 60 : fd_, level, optname, data, static_cast<socklen_t>(size)) != 0)
145 MIS 0 : return make_err(errno);
146 HIT 60 : return {};
147 : }
148 :
149 : /// Get a socket option.
150 : std::error_code
151 62 : get_option(int level, int optname, void* data, std::size_t* size)
152 : const noexcept override
153 : {
154 62 : socklen_t len = static_cast<socklen_t>(*size);
155 62 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
156 MIS 0 : return make_err(errno);
157 HIT 62 : *size = static_cast<std::size_t>(len);
158 62 : return {};
159 : }
160 :
161 : /// Assign the file descriptor.
162 7481 : void set_socket(int fd) noexcept
163 : {
164 7481 : fd_ = fd;
165 7481 : }
166 :
167 : /// Cache local and remote endpoints.
168 14962 : void set_endpoints(endpoint local, endpoint remote) noexcept
169 : {
170 14962 : local_endpoint_ = local;
171 14962 : remote_endpoint_ = remote;
172 14962 : }
173 :
174 : /** Register an op with the reactor.
175 :
176 : Handles cached edge events and deferred cancellation.
177 : Called on the EAGAIN/EINPROGRESS path when speculative
178 : I/O failed.
179 : */
180 : void register_op(
181 : Op& op,
182 : reactor_op_base*& desc_slot,
183 : bool& ready_flag,
184 : bool& cancel_flag) noexcept;
185 :
186 : /** Cancel a single pending operation.
187 :
188 : Claims the operation from its descriptor_state slot under
189 : the mutex and posts it to the scheduler as cancelled.
190 :
191 : @param op The operation to cancel.
192 : */
193 : void cancel_single_op(Op& op) noexcept;
194 :
195 : /** Cancel all pending operations.
196 :
197 : Invoked by the derived class's cancel() override.
198 : */
199 : void do_cancel() noexcept;
200 :
201 : /** Close the socket and cancel pending operations.
202 :
203 : Invoked by the derived class's close_socket(). The
204 : derived class may add backend-specific cleanup after
205 : calling this method.
206 : */
207 : void do_close_socket() noexcept;
208 :
209 : /** Shared connect dispatch.
210 :
211 : Tries the connect syscall speculatively. On synchronous
212 : completion, returns via inline budget or posts through queue.
213 : On EINPROGRESS, registers with the reactor.
214 : */
215 : std::coroutine_handle<> do_connect(
216 : std::coroutine_handle<>,
217 : capy::executor_ref,
218 : endpoint,
219 : std::stop_token const&,
220 : std::error_code*);
221 :
222 : /** Shared scatter-read dispatch.
223 :
224 : Tries readv() speculatively. On success or hard error,
225 : returns via inline budget or posts through queue.
226 : On EAGAIN, registers with the reactor.
227 : */
228 : std::coroutine_handle<> do_read_some(
229 : std::coroutine_handle<>,
230 : capy::executor_ref,
231 : buffer_param,
232 : std::stop_token const&,
233 : std::error_code*,
234 : std::size_t*);
235 :
236 : /** Shared gather-write dispatch.
237 :
238 : Tries the write via WriteOp::write_policy speculatively.
239 : On success or hard error, returns via inline budget or
240 : posts through queue. On EAGAIN, registers with the reactor.
241 : */
242 : std::coroutine_handle<> do_write_some(
243 : std::coroutine_handle<>,
244 : capy::executor_ref,
245 : buffer_param,
246 : std::stop_token const&,
247 : std::error_code*,
248 : std::size_t*);
249 : };
250 :
251 : template<
252 : class Derived,
253 : class Service,
254 : class Op,
255 : class ConnOp,
256 : class ReadOp,
257 : class WriteOp,
258 : class DescState>
259 : void
260 7879 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
261 : register_op(
262 : Op& op,
263 : reactor_op_base*& desc_slot,
264 : bool& ready_flag,
265 : bool& cancel_flag) noexcept
266 : {
267 7879 : svc_.work_started();
268 :
269 7879 : std::lock_guard lock(desc_state_.mutex);
270 7879 : bool io_done = false;
271 7879 : if (ready_flag)
272 : {
273 189 : ready_flag = false;
274 189 : op.perform_io();
275 189 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
276 189 : if (!io_done)
277 189 : op.errn = 0;
278 : }
279 :
280 7879 : if (cancel_flag)
281 : {
282 MIS 0 : cancel_flag = false;
283 0 : op.cancelled.store(true, std::memory_order_relaxed);
284 : }
285 :
286 HIT 7879 : if (io_done || op.cancelled.load(std::memory_order_acquire))
287 : {
288 MIS 0 : svc_.post(&op);
289 0 : svc_.work_finished();
290 : }
291 : else
292 : {
293 HIT 7879 : desc_slot = &op;
294 : }
295 7879 : }
296 :
297 : template<
298 : class Derived,
299 : class Service,
300 : class Op,
301 : class ConnOp,
302 : class ReadOp,
303 : class WriteOp,
304 : class DescState>
305 : void
306 193 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
307 : cancel_single_op(Op& op) noexcept
308 : {
309 193 : auto self = this->weak_from_this().lock();
310 193 : if (!self)
311 MIS 0 : return;
312 :
313 HIT 193 : op.request_cancel();
314 :
315 193 : reactor_op_base** desc_op_ptr = nullptr;
316 193 : if (&op == &conn_)
317 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
318 HIT 193 : else if (&op == &rd_)
319 193 : desc_op_ptr = &desc_state_.read_op;
320 MIS 0 : else if (&op == &wr_)
321 0 : desc_op_ptr = &desc_state_.write_op;
322 :
323 HIT 193 : if (desc_op_ptr)
324 : {
325 193 : reactor_op_base* claimed = nullptr;
326 : {
327 193 : std::lock_guard lock(desc_state_.mutex);
328 193 : if (*desc_op_ptr == &op)
329 193 : claimed = std::exchange(*desc_op_ptr, nullptr);
330 MIS 0 : else if (&op == &conn_)
331 0 : desc_state_.connect_cancel_pending = true;
332 0 : else if (&op == &rd_)
333 0 : desc_state_.read_cancel_pending = true;
334 0 : else if (&op == &wr_)
335 0 : desc_state_.write_cancel_pending = true;
336 HIT 193 : }
337 193 : if (claimed)
338 : {
339 193 : op.impl_ptr = self;
340 193 : svc_.post(&op);
341 193 : svc_.work_finished();
342 : }
343 : }
344 193 : }
345 :
346 : template<
347 : class Derived,
348 : class Service,
349 : class Op,
350 : class ConnOp,
351 : class ReadOp,
352 : class WriteOp,
353 : class DescState>
354 : void
355 190 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
356 : do_cancel() noexcept
357 : {
358 190 : auto self = this->weak_from_this().lock();
359 190 : if (!self)
360 MIS 0 : return;
361 :
362 HIT 190 : conn_.request_cancel();
363 190 : rd_.request_cancel();
364 190 : wr_.request_cancel();
365 :
366 190 : reactor_op_base* conn_claimed = nullptr;
367 190 : reactor_op_base* rd_claimed = nullptr;
368 190 : reactor_op_base* wr_claimed = nullptr;
369 : {
370 190 : std::lock_guard lock(desc_state_.mutex);
371 190 : if (desc_state_.connect_op == &conn_)
372 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
373 HIT 190 : if (desc_state_.read_op == &rd_)
374 99 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
375 190 : if (desc_state_.write_op == &wr_)
376 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
377 HIT 190 : }
378 :
379 190 : if (conn_claimed)
380 : {
381 MIS 0 : conn_.impl_ptr = self;
382 0 : svc_.post(&conn_);
383 0 : svc_.work_finished();
384 : }
385 HIT 190 : if (rd_claimed)
386 : {
387 99 : rd_.impl_ptr = self;
388 99 : svc_.post(&rd_);
389 99 : svc_.work_finished();
390 : }
391 190 : if (wr_claimed)
392 : {
393 MIS 0 : wr_.impl_ptr = self;
394 0 : svc_.post(&wr_);
395 0 : svc_.work_finished();
396 : }
397 HIT 190 : }
398 :
399 : template<
400 : class Derived,
401 : class Service,
402 : class Op,
403 : class ConnOp,
404 : class ReadOp,
405 : class WriteOp,
406 : class DescState>
407 : void
408 67575 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
409 : do_close_socket() noexcept
410 : {
411 67575 : auto self = this->weak_from_this().lock();
412 67575 : if (self)
413 : {
414 67575 : conn_.request_cancel();
415 67575 : rd_.request_cancel();
416 67575 : wr_.request_cancel();
417 :
418 67575 : reactor_op_base* conn_claimed = nullptr;
419 67575 : reactor_op_base* rd_claimed = nullptr;
420 67575 : reactor_op_base* wr_claimed = nullptr;
421 : {
422 67575 : std::lock_guard lock(desc_state_.mutex);
423 67575 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
424 67575 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
425 67575 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
426 67575 : desc_state_.read_ready = false;
427 67575 : desc_state_.write_ready = false;
428 67575 : desc_state_.read_cancel_pending = false;
429 67575 : desc_state_.write_cancel_pending = false;
430 67575 : desc_state_.connect_cancel_pending = false;
431 :
432 : // Keep impl alive while descriptor_state is queued in the
433 : // scheduler. Must be under mutex to avoid racing with
434 : // invoke_deferred_io()'s move of impl_ref_.
435 67575 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
436 163 : desc_state_.impl_ref_ = self;
437 67575 : }
438 :
439 67575 : if (conn_claimed)
440 : {
441 MIS 0 : conn_.impl_ptr = self;
442 0 : svc_.post(&conn_);
443 0 : svc_.work_finished();
444 : }
445 HIT 67575 : if (rd_claimed)
446 : {
447 2 : rd_.impl_ptr = self;
448 2 : svc_.post(&rd_);
449 2 : svc_.work_finished();
450 : }
451 67575 : if (wr_claimed)
452 : {
453 MIS 0 : wr_.impl_ptr = self;
454 0 : svc_.post(&wr_);
455 0 : svc_.work_finished();
456 : }
457 : }
458 :
459 HIT 67575 : if (fd_ >= 0)
460 : {
461 14996 : if (desc_state_.registered_events != 0)
462 14996 : svc_.scheduler().deregister_descriptor(fd_);
463 14996 : ::close(fd_);
464 14996 : fd_ = -1;
465 : }
466 :
467 67575 : desc_state_.fd = -1;
468 67575 : desc_state_.registered_events = 0;
469 :
470 67575 : local_endpoint_ = endpoint{};
471 67575 : remote_endpoint_ = endpoint{};
472 67575 : }
473 :
474 : template<
475 : class Derived,
476 : class Service,
477 : class Op,
478 : class ConnOp,
479 : class ReadOp,
480 : class WriteOp,
481 : class DescState>
482 : std::coroutine_handle<>
483 7485 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
484 : do_connect(
485 : std::coroutine_handle<> h,
486 : capy::executor_ref ex,
487 : endpoint ep,
488 : std::stop_token const& token,
489 : std::error_code* ec)
490 : {
491 7485 : auto& op = conn_;
492 :
493 7485 : sockaddr_storage storage{};
494 7485 : socklen_t addrlen = to_sockaddr(ep, socket_family(fd_), storage);
495 7485 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
496 :
497 7485 : if (result == 0)
498 : {
499 MIS 0 : sockaddr_storage local_storage{};
500 0 : socklen_t local_len = sizeof(local_storage);
501 0 : if (::getsockname(
502 0 : fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
503 : 0)
504 0 : local_endpoint_ = from_sockaddr(local_storage);
505 0 : remote_endpoint_ = ep;
506 : }
507 :
508 HIT 7485 : if (result == 0 || errno != EINPROGRESS)
509 : {
510 MIS 0 : int err = (result < 0) ? errno : 0;
511 0 : if (svc_.scheduler().try_consume_inline_budget())
512 : {
513 0 : *ec = err ? make_err(err) : std::error_code{};
514 0 : return dispatch_coro(ex, h);
515 : }
516 0 : op.reset();
517 0 : op.h = h;
518 0 : op.ex = ex;
519 0 : op.ec_out = ec;
520 0 : op.fd = fd_;
521 0 : op.target_endpoint = ep;
522 0 : op.start(token, static_cast<Derived*>(this));
523 0 : op.impl_ptr = this->shared_from_this();
524 0 : op.complete(err, 0);
525 0 : svc_.post(&op);
526 0 : return std::noop_coroutine();
527 : }
528 :
529 : // EINPROGRESS — register with reactor
530 HIT 7485 : op.reset();
531 7485 : op.h = h;
532 7485 : op.ex = ex;
533 7485 : op.ec_out = ec;
534 7485 : op.fd = fd_;
535 7485 : op.target_endpoint = ep;
536 7485 : op.start(token, static_cast<Derived*>(this));
537 7485 : op.impl_ptr = this->shared_from_this();
538 :
539 7485 : register_op(
540 7485 : op, desc_state_.connect_op, desc_state_.write_ready,
541 7485 : desc_state_.connect_cancel_pending);
542 7485 : return std::noop_coroutine();
543 : }
544 :
545 : template<
546 : class Derived,
547 : class Service,
548 : class Op,
549 : class ConnOp,
550 : class ReadOp,
551 : class WriteOp,
552 : class DescState>
553 : std::coroutine_handle<>
554 253679 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
555 : do_read_some(
556 : std::coroutine_handle<> h,
557 : capy::executor_ref ex,
558 : buffer_param param,
559 : std::stop_token const& token,
560 : std::error_code* ec,
561 : std::size_t* bytes_out)
562 : {
563 253679 : auto& op = rd_;
564 253679 : op.reset();
565 :
566 253679 : capy::mutable_buffer bufs[ReadOp::max_buffers];
567 253679 : op.iovec_count = static_cast<int>(param.copy_to(bufs, ReadOp::max_buffers));
568 :
569 253679 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
570 : {
571 2 : op.empty_buffer_read = true;
572 2 : op.h = h;
573 2 : op.ex = ex;
574 2 : op.ec_out = ec;
575 2 : op.bytes_out = bytes_out;
576 2 : op.start(token, static_cast<Derived*>(this));
577 2 : op.impl_ptr = this->shared_from_this();
578 2 : op.complete(0, 0);
579 2 : svc_.post(&op);
580 2 : return std::noop_coroutine();
581 : }
582 :
583 507354 : for (int i = 0; i < op.iovec_count; ++i)
584 : {
585 253677 : op.iovecs[i].iov_base = bufs[i].data();
586 253677 : op.iovecs[i].iov_len = bufs[i].size();
587 : }
588 :
589 : // Speculative read
590 : ssize_t n;
591 : do
592 : {
593 253677 : n = ::readv(fd_, op.iovecs, op.iovec_count);
594 : }
595 253677 : while (n < 0 && errno == EINTR);
596 :
597 253677 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
598 : {
599 253283 : int err = (n < 0) ? errno : 0;
600 253283 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
601 :
602 253283 : if (svc_.scheduler().try_consume_inline_budget())
603 : {
604 202662 : if (err)
605 MIS 0 : *ec = make_err(err);
606 HIT 202662 : else if (n == 0)
607 10 : *ec = capy::error::eof;
608 : else
609 202652 : *ec = {};
610 202662 : *bytes_out = bytes;
611 202662 : return dispatch_coro(ex, h);
612 : }
613 50621 : op.h = h;
614 50621 : op.ex = ex;
615 50621 : op.ec_out = ec;
616 50621 : op.bytes_out = bytes_out;
617 50621 : op.start(token, static_cast<Derived*>(this));
618 50621 : op.impl_ptr = this->shared_from_this();
619 50621 : op.complete(err, bytes);
620 50621 : svc_.post(&op);
621 50621 : return std::noop_coroutine();
622 : }
623 :
624 : // EAGAIN — register with reactor
625 394 : op.h = h;
626 394 : op.ex = ex;
627 394 : op.ec_out = ec;
628 394 : op.bytes_out = bytes_out;
629 394 : op.fd = fd_;
630 394 : op.start(token, static_cast<Derived*>(this));
631 394 : op.impl_ptr = this->shared_from_this();
632 :
633 394 : register_op(
634 394 : op, desc_state_.read_op, desc_state_.read_ready,
635 394 : desc_state_.read_cancel_pending);
636 394 : return std::noop_coroutine();
637 : }
638 :
639 : template<
640 : class Derived,
641 : class Service,
642 : class Op,
643 : class ConnOp,
644 : class ReadOp,
645 : class WriteOp,
646 : class DescState>
647 : std::coroutine_handle<>
648 253378 : reactor_socket<Derived, Service, Op, ConnOp, ReadOp, WriteOp, DescState>::
649 : do_write_some(
650 : std::coroutine_handle<> h,
651 : capy::executor_ref ex,
652 : buffer_param param,
653 : std::stop_token const& token,
654 : std::error_code* ec,
655 : std::size_t* bytes_out)
656 : {
657 253378 : auto& op = wr_;
658 253378 : op.reset();
659 :
660 253378 : capy::mutable_buffer bufs[WriteOp::max_buffers];
661 253378 : op.iovec_count =
662 253378 : static_cast<int>(param.copy_to(bufs, WriteOp::max_buffers));
663 :
664 253378 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
665 : {
666 2 : op.h = h;
667 2 : op.ex = ex;
668 2 : op.ec_out = ec;
669 2 : op.bytes_out = bytes_out;
670 2 : op.start(token, static_cast<Derived*>(this));
671 2 : op.impl_ptr = this->shared_from_this();
672 2 : op.complete(0, 0);
673 2 : svc_.post(&op);
674 2 : return std::noop_coroutine();
675 : }
676 :
677 506752 : for (int i = 0; i < op.iovec_count; ++i)
678 : {
679 253376 : op.iovecs[i].iov_base = bufs[i].data();
680 253376 : op.iovecs[i].iov_len = bufs[i].size();
681 : }
682 :
683 : // Speculative write via backend-specific write policy
684 253376 : ssize_t n = WriteOp::write_policy::write(fd_, op.iovecs, op.iovec_count);
685 :
686 253376 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
687 : {
688 253376 : int err = (n < 0) ? errno : 0;
689 253376 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
690 :
691 253376 : if (svc_.scheduler().try_consume_inline_budget())
692 : {
693 202719 : *ec = err ? make_err(err) : std::error_code{};
694 202719 : *bytes_out = bytes;
695 202719 : return dispatch_coro(ex, h);
696 : }
697 50657 : op.h = h;
698 50657 : op.ex = ex;
699 50657 : op.ec_out = ec;
700 50657 : op.bytes_out = bytes_out;
701 50657 : op.start(token, static_cast<Derived*>(this));
702 50657 : op.impl_ptr = this->shared_from_this();
703 50657 : op.complete(err, bytes);
704 50657 : svc_.post(&op);
705 50657 : return std::noop_coroutine();
706 : }
707 :
708 : // EAGAIN — register with reactor
709 MIS 0 : op.h = h;
710 0 : op.ex = ex;
711 0 : op.ec_out = ec;
712 0 : op.bytes_out = bytes_out;
713 0 : op.fd = fd_;
714 0 : op.start(token, static_cast<Derived*>(this));
715 0 : op.impl_ptr = this->shared_from_this();
716 :
717 0 : register_op(
718 0 : op, desc_state_.write_op, desc_state_.write_ready,
719 0 : desc_state_.write_cancel_pending);
720 0 : return std::noop_coroutine();
721 : }
722 :
723 : } // namespace boost::corosio::detail
724 :
725 : #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_SOCKET_HPP
|