TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_socket.hpp>
22 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 : #include <boost/corosio/native/detail/reactor/reactor_service_state.hpp>
24 :
25 : #include <boost/corosio/native/detail/reactor/reactor_op_complete.hpp>
26 :
27 : #include <coroutine>
28 : #include <mutex>
29 : #include <utility>
30 :
31 : #include <errno.h>
32 : #include <fcntl.h>
33 : #include <netinet/in.h>
34 : #include <netinet/tcp.h>
35 : #include <sys/select.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : /*
40 : Each I/O op tries the syscall speculatively; only registers with
41 : the reactor on EAGAIN. Fd is registered once at open time and
42 : stays registered until close. The reactor only marks ready_events_;
43 : actual I/O happens in invoke_deferred_io(). cancel() captures
44 : shared_from_this() into op.impl_ptr to keep the impl alive.
45 : */
46 :
47 : namespace boost::corosio::detail {
48 :
49 : /// State for select socket service.
50 : using select_socket_state =
51 : reactor_service_state<select_scheduler, select_socket>;
52 :
53 : /** select socket service implementation.
54 :
55 : Inherits from socket_service to enable runtime polymorphism.
56 : Uses key_type = socket_service for service lookup.
57 : */
58 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
59 : {
60 : public:
61 : explicit select_socket_service(capy::execution_context& ctx);
62 : ~select_socket_service() override;
63 :
64 : select_socket_service(select_socket_service const&) = delete;
65 : select_socket_service& operator=(select_socket_service const&) = delete;
66 :
67 : void shutdown() override;
68 :
69 : io_object::implementation* construct() override;
70 : void destroy(io_object::implementation*) override;
71 : void close(io_object::handle&) override;
72 : std::error_code open_socket(
73 : tcp_socket::implementation& impl,
74 : int family,
75 : int type,
76 : int protocol) override;
77 :
78 HIT 292886 : select_scheduler& scheduler() const noexcept
79 : {
80 292886 : return state_->sched_;
81 : }
82 : void post(scheduler_op* op);
83 : void work_started() noexcept;
84 : void work_finished() noexcept;
85 :
86 : private:
87 : std::unique_ptr<select_socket_state> state_;
88 : };
89 :
90 : inline void
91 MIS 0 : select_connect_op::cancel() noexcept
92 : {
93 0 : if (socket_impl_)
94 0 : socket_impl_->cancel_single_op(*this);
95 : else
96 0 : request_cancel();
97 0 : }
98 :
99 : inline void
100 HIT 94 : select_read_op::cancel() noexcept
101 : {
102 94 : if (socket_impl_)
103 94 : socket_impl_->cancel_single_op(*this);
104 : else
105 MIS 0 : request_cancel();
106 HIT 94 : }
107 :
108 : inline void
109 MIS 0 : select_write_op::cancel() noexcept
110 : {
111 0 : if (socket_impl_)
112 0 : socket_impl_->cancel_single_op(*this);
113 : else
114 0 : request_cancel();
115 0 : }
116 :
117 : inline void
118 HIT 50611 : select_op::operator()()
119 : {
120 50611 : complete_io_op(*this);
121 50611 : }
122 :
123 : inline void
124 3086 : select_connect_op::operator()()
125 : {
126 3086 : complete_connect_op(*this);
127 3086 : }
128 :
129 9278 : inline select_socket::select_socket(select_socket_service& svc) noexcept
130 9278 : : reactor_socket(svc)
131 : {
132 9278 : }
133 :
134 9278 : inline select_socket::~select_socket() = default;
135 :
136 : inline std::coroutine_handle<>
137 3086 : select_socket::connect(
138 : std::coroutine_handle<> h,
139 : capy::executor_ref ex,
140 : endpoint ep,
141 : std::stop_token token,
142 : std::error_code* ec)
143 : {
144 3086 : auto result = do_connect(h, ex, ep, token, ec);
145 : // Rebuild fd_sets so select() watches for writability
146 3086 : if (result == std::noop_coroutine())
147 3086 : svc_.scheduler().notify_reactor();
148 3086 : return result;
149 : }
150 :
151 : inline std::coroutine_handle<>
152 126277 : select_socket::read_some(
153 : std::coroutine_handle<> h,
154 : capy::executor_ref ex,
155 : buffer_param param,
156 : std::stop_token token,
157 : std::error_code* ec,
158 : std::size_t* bytes_out)
159 : {
160 126277 : return do_read_some(h, ex, param, token, ec, bytes_out);
161 : }
162 :
163 : inline std::coroutine_handle<>
164 126129 : select_socket::write_some(
165 : std::coroutine_handle<> h,
166 : capy::executor_ref ex,
167 : buffer_param param,
168 : std::stop_token token,
169 : std::error_code* ec,
170 : std::size_t* bytes_out)
171 : {
172 126129 : auto result = do_write_some(h, ex, param, token, ec, bytes_out);
173 : // Rebuild fd_sets so select() watches for writability
174 126129 : if (result == std::noop_coroutine())
175 25218 : svc_.scheduler().notify_reactor();
176 126129 : return result;
177 : }
178 :
179 : inline void
180 94 : select_socket::cancel() noexcept
181 : {
182 94 : do_cancel();
183 94 : }
184 :
185 : inline void
186 27842 : select_socket::close_socket() noexcept
187 : {
188 27842 : do_close_socket();
189 27842 : }
190 :
191 168 : inline select_socket_service::select_socket_service(
192 168 : capy::execution_context& ctx)
193 168 : : state_(
194 : std::make_unique<select_socket_state>(
195 168 : ctx.use_service<select_scheduler>()))
196 : {
197 168 : }
198 :
199 336 : inline select_socket_service::~select_socket_service() {}
200 :
201 : inline void
202 168 : select_socket_service::shutdown()
203 : {
204 168 : std::lock_guard lock(state_->mutex_);
205 :
206 168 : while (auto* impl = state_->impl_list_.pop_front())
207 MIS 0 : impl->close_socket();
208 :
209 : // Don't clear impl_ptrs_ here. The scheduler shuts down after us and
210 : // drains completed_ops_, calling destroy() on each queued op. Letting
211 : // ~state_ release the ptrs (during service destruction, after scheduler
212 : // shutdown) keeps every impl alive until all ops have been drained.
213 HIT 168 : }
214 :
215 : inline io_object::implementation*
216 9278 : select_socket_service::construct()
217 : {
218 9278 : auto impl = std::make_shared<select_socket>(*this);
219 9278 : auto* raw = impl.get();
220 :
221 : {
222 9278 : std::lock_guard lock(state_->mutex_);
223 9278 : state_->impl_ptrs_.emplace(raw, std::move(impl));
224 9278 : state_->impl_list_.push_back(raw);
225 9278 : }
226 :
227 9278 : return raw;
228 9278 : }
229 :
230 : inline void
231 9278 : select_socket_service::destroy(io_object::implementation* impl)
232 : {
233 9278 : auto* select_impl = static_cast<select_socket*>(impl);
234 9278 : select_impl->close_socket();
235 9278 : std::lock_guard lock(state_->mutex_);
236 9278 : state_->impl_list_.remove(select_impl);
237 9278 : state_->impl_ptrs_.erase(select_impl);
238 9278 : }
239 :
240 : inline std::error_code
241 3101 : select_socket_service::open_socket(
242 : tcp_socket::implementation& impl, int family, int type, int protocol)
243 : {
244 3101 : auto* select_impl = static_cast<select_socket*>(&impl);
245 3101 : select_impl->close_socket();
246 :
247 3101 : int fd = ::socket(family, type, protocol);
248 3101 : if (fd < 0)
249 MIS 0 : return make_err(errno);
250 :
251 HIT 3101 : if (family == AF_INET6)
252 : {
253 5 : int one = 1;
254 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
255 : }
256 :
257 3101 : int flags = ::fcntl(fd, F_GETFL, 0);
258 3101 : if (flags == -1)
259 : {
260 MIS 0 : int errn = errno;
261 0 : ::close(fd);
262 0 : return make_err(errn);
263 : }
264 HIT 3101 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
265 : {
266 MIS 0 : int errn = errno;
267 0 : ::close(fd);
268 0 : return make_err(errn);
269 : }
270 HIT 3101 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
271 : {
272 MIS 0 : int errn = errno;
273 0 : ::close(fd);
274 0 : return make_err(errn);
275 : }
276 :
277 HIT 3101 : if (fd >= FD_SETSIZE)
278 : {
279 MIS 0 : ::close(fd);
280 0 : return make_err(EMFILE);
281 : }
282 :
283 : #ifdef SO_NOSIGPIPE
284 : {
285 : int one = 1;
286 : ::setsockopt(fd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
287 : }
288 : #endif
289 :
290 HIT 3101 : select_impl->fd_ = fd;
291 :
292 3101 : select_impl->desc_state_.fd = fd;
293 : {
294 3101 : std::lock_guard lock(select_impl->desc_state_.mutex);
295 3101 : select_impl->desc_state_.read_op = nullptr;
296 3101 : select_impl->desc_state_.write_op = nullptr;
297 3101 : select_impl->desc_state_.connect_op = nullptr;
298 3101 : }
299 3101 : scheduler().register_descriptor(fd, &select_impl->desc_state_);
300 :
301 3101 : return {};
302 : }
303 :
304 : inline void
305 15463 : select_socket_service::close(io_object::handle& h)
306 : {
307 15463 : static_cast<select_socket*>(h.get())->close_socket();
308 15463 : }
309 :
310 : inline void
311 50563 : select_socket_service::post(scheduler_op* op)
312 : {
313 50563 : state_->sched_.post(op);
314 50563 : }
315 :
316 : inline void
317 3278 : select_socket_service::work_started() noexcept
318 : {
319 3278 : state_->sched_.work_started();
320 3278 : }
321 :
322 : inline void
323 144 : select_socket_service::work_finished() noexcept
324 : {
325 144 : state_->sched_.work_finished();
326 144 : }
327 :
328 : } // namespace boost::corosio::detail
329 :
330 : #endif // BOOST_COROSIO_HAS_SELECT
331 :
332 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
|