include/boost/capy/io/any_read_source.hpp

91.2% Lines (83/91) 92.0% Functions (23/25)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/io_task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <system_error>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any ReadSource.
38
39 This class provides type erasure for any type satisfying the
40 @ref ReadSource concept, enabling runtime polymorphism for
41 source read operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the source.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to source must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Immediate Completion
57 Operations complete immediately without suspending when the
58 buffer sequence is empty, or when the underlying source's
59 awaitable reports readiness via `await_ready`.
60
61 @par Thread Safety
62 Not thread-safe. Concurrent operations on the same wrapper
63 are undefined behavior.
64
65 @par Example
66 @code
67 // Owning - takes ownership of the source
68 any_read_source rs(some_source{args...});
69
70 // Reference - wraps without ownership
71 some_source source;
72 any_read_source rs(&source);
73
74 mutable_buffer buf(data, size);
75 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76 @endcode
77
78 @see any_read_stream, ReadSource
79 */
80 class any_read_source
81 {
82 struct vtable;
83 struct awaitable_ops;
84
85 template<ReadSource S>
86 struct vtable_for_impl;
87
88 void* source_ = nullptr;
89 vtable const* vt_ = nullptr;
90 void* cached_awaitable_ = nullptr;
91 void* storage_ = nullptr;
92 awaitable_ops const* active_ops_ = nullptr;
93
94 public:
95 /** Destructor.
96
97 Destroys the owned source (if any) and releases the cached
98 awaitable storage.
99 */
100 ~any_read_source();
101
102 /** Construct a default instance.
103
104 Constructs an empty wrapper. Operations on a default-constructed
105 wrapper result in undefined behavior.
106 */
107 any_read_source() = default;
108
109 /** Non-copyable.
110
111 The awaitable cache is per-instance and cannot be shared.
112 */
113 any_read_source(any_read_source const&) = delete;
114 any_read_source& operator=(any_read_source const&) = delete;
115
116 /** Construct by moving.
117
118 Transfers ownership of the wrapped source (if owned) and
119 cached awaitable storage from `other`. After the move, `other` is
120 in a default-constructed state.
121
122 @param other The wrapper to move from.
123 */
124 1x any_read_source(any_read_source&& other) noexcept
125 1x : source_(std::exchange(other.source_, nullptr))
126 1x , vt_(std::exchange(other.vt_, nullptr))
127 1x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128 1x , storage_(std::exchange(other.storage_, nullptr))
129 1x , active_ops_(std::exchange(other.active_ops_, nullptr))
130 {
131 1x }
132
133 /** Assign by moving.
134
135 Destroys any owned source and releases existing resources,
136 then transfers ownership from `other`.
137
138 @param other The wrapper to move from.
139 @return Reference to this wrapper.
140 */
141 any_read_source&
142 operator=(any_read_source&& other) noexcept;
143
144 /** Construct by taking ownership of a ReadSource.
145
146 Allocates storage and moves the source into this wrapper.
147 The wrapper owns the source and will destroy it.
148
149 @param s The source to take ownership of.
150 */
151 template<ReadSource S>
152 requires (!std::same_as<std::decay_t<S>, any_read_source>)
153 any_read_source(S s);
154
155 /** Construct by wrapping a ReadSource without ownership.
156
157 Wraps the given source by pointer. The source must remain
158 valid for the lifetime of this wrapper.
159
160 @param s Pointer to the source to wrap.
161 */
162 template<ReadSource S>
163 any_read_source(S* s);
164
165 /** Check if the wrapper contains a valid source.
166
167 @return `true` if wrapping a source, `false` if default-constructed
168 or moved-from.
169 */
170 bool
171 27x has_value() const noexcept
172 {
173 27x return source_ != nullptr;
174 }
175
176 /** Check if the wrapper contains a valid source.
177
178 @return `true` if wrapping a source, `false` if default-constructed
179 or moved-from.
180 */
181 explicit
182 8x operator bool() const noexcept
183 {
184 8x return has_value();
185 }
186
187 /** Initiate a partial read operation.
188
189 Reads one or more bytes into the provided buffer sequence.
190 May fill less than the full sequence.
191
192 @param buffers The buffer sequence to read into.
193
194 @return An awaitable yielding `(error_code,std::size_t)`.
195
196 @par Immediate Completion
197 The operation completes immediately without suspending
198 the calling coroutine when:
199 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200 @li The underlying source's awaitable reports immediate
201 readiness via `await_ready`.
202
203 @note This is a partial operation and may not process the
204 entire buffer sequence. Use @ref read for guaranteed
205 complete transfer.
206
207 @par Preconditions
208 The wrapper must contain a valid source (`has_value() == true`).
209 The caller must not call this function again after a prior
210 call returned an error (including EOF).
211 */
212 template<MutableBufferSequence MB>
213 auto
214 read_some(MB buffers);
215
216 /** Initiate a complete read operation.
217
218 Reads data into the provided buffer sequence by forwarding
219 to the underlying source's `read` operation. Large buffer
220 sequences are processed in windows, with each window
221 forwarded as a separate `read` call to the underlying source.
222 The operation completes when the entire buffer sequence is
223 filled, end-of-file is reached, or an error occurs.
224
225 @param buffers The buffer sequence to read into.
226
227 @return An awaitable yielding `(error_code,std::size_t)`.
228
229 @par Immediate Completion
230 The operation completes immediately without suspending
231 the calling coroutine when:
232 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
233 @li The underlying source's `read` awaitable reports
234 immediate readiness via `await_ready`.
235
236 @par Postconditions
237 Exactly one of the following is true on return:
238 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
239 The entire buffer was filled.
240 @li **End-of-stream or Error**: `ec` and `n` indicates
241 the number of bytes transferred before the failure.
242
243 @par Preconditions
244 The wrapper must contain a valid source (`has_value() == true`).
245 The caller must not call this function again after a prior
246 call returned an error (including EOF).
247 */
248 template<MutableBufferSequence MB>
249 io_task<std::size_t>
250 read(MB buffers);
251
252 protected:
253 /** Rebind to a new source after move.
254
255 Updates the internal pointer to reference a new source object.
256 Used by owning wrappers after move assignment when the owned
257 object has moved to a new location.
258
259 @param new_source The new source to bind to. Must be the same
260 type as the original source.
261
262 @note Terminates if called with a source of different type
263 than the original.
264 */
265 template<ReadSource S>
266 void
267 rebind(S& new_source) noexcept
268 {
269 if(vt_ != &vtable_for_impl<S>::value)
270 std::terminate();
271 source_ = &new_source;
272 }
273
274 private:
275 auto
276 read_(std::span<mutable_buffer const> buffers);
277 };
278
279 // ordered by call sequence for cache line coherence
280 struct any_read_source::awaitable_ops
281 {
282 bool (*await_ready)(void*);
283 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
284 io_result<std::size_t> (*await_resume)(void*);
285 void (*destroy)(void*) noexcept;
286 };
287
288 // ordered by call frequency for cache line coherence
289 struct any_read_source::vtable
290 {
291 awaitable_ops const* (*construct_read_some_awaitable)(
292 void* source,
293 void* storage,
294 std::span<mutable_buffer const> buffers);
295 awaitable_ops const* (*construct_read_awaitable)(
296 void* source,
297 void* storage,
298 std::span<mutable_buffer const> buffers);
299 std::size_t awaitable_size;
300 std::size_t awaitable_align;
301 void (*destroy)(void*) noexcept;
302 };
303
304 template<ReadSource S>
305 struct any_read_source::vtable_for_impl
306 {
307 using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
308 std::span<mutable_buffer const>{}));
309 using ReadAwaitable = decltype(std::declval<S&>().read(
310 std::span<mutable_buffer const>{}));
311
312 static void
313 6x do_destroy_impl(void* source) noexcept
314 {
315 6x static_cast<S*>(source)->~S();
316 6x }
317
318 static awaitable_ops const*
319 52x construct_read_some_awaitable_impl(
320 void* source,
321 void* storage,
322 std::span<mutable_buffer const> buffers)
323 {
324 52x auto& s = *static_cast<S*>(source);
325 52x ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
326
327 static constexpr awaitable_ops ops = {
328 +[](void* p) {
329 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
330 },
331 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
332 return detail::call_await_suspend(
333 static_cast<ReadSomeAwaitable*>(p), h, env);
334 },
335 +[](void* p) {
336 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
337 },
338 +[](void* p) noexcept {
339 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
340 }
341 };
342 52x return &ops;
343 }
344
345 static awaitable_ops const*
346 116x construct_read_awaitable_impl(
347 void* source,
348 void* storage,
349 std::span<mutable_buffer const> buffers)
350 {
351 116x auto& s = *static_cast<S*>(source);
352 116x ::new(storage) ReadAwaitable(s.read(buffers));
353
354 static constexpr awaitable_ops ops = {
355 +[](void* p) {
356 return static_cast<ReadAwaitable*>(p)->await_ready();
357 },
358 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
359 return detail::call_await_suspend(
360 static_cast<ReadAwaitable*>(p), h, env);
361 },
362 +[](void* p) {
363 return static_cast<ReadAwaitable*>(p)->await_resume();
364 },
365 +[](void* p) noexcept {
366 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
367 }
368 };
369 116x return &ops;
370 }
371
372 static constexpr std::size_t max_awaitable_size =
373 sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
374 ? sizeof(ReadSomeAwaitable)
375 : sizeof(ReadAwaitable);
376 static constexpr std::size_t max_awaitable_align =
377 alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
378 ? alignof(ReadSomeAwaitable)
379 : alignof(ReadAwaitable);
380
381 static constexpr vtable value = {
382 &construct_read_some_awaitable_impl,
383 &construct_read_awaitable_impl,
384 max_awaitable_size,
385 max_awaitable_align,
386 &do_destroy_impl
387 };
388 };
389
390 inline
391 145x any_read_source::~any_read_source()
392 {
393 145x if(storage_)
394 {
395 6x vt_->destroy(source_);
396 6x ::operator delete(storage_);
397 }
398 145x if(cached_awaitable_)
399 {
400 139x if(active_ops_)
401 1x active_ops_->destroy(cached_awaitable_);
402 139x ::operator delete(cached_awaitable_);
403 }
404 145x }
405
406 inline any_read_source&
407 4x any_read_source::operator=(any_read_source&& other) noexcept
408 {
409 4x if(this != &other)
410 {
411 3x if(storage_)
412 {
413 vt_->destroy(source_);
414 ::operator delete(storage_);
415 }
416 3x if(cached_awaitable_)
417 {
418 2x if(active_ops_)
419 1x active_ops_->destroy(cached_awaitable_);
420 2x ::operator delete(cached_awaitable_);
421 }
422 3x source_ = std::exchange(other.source_, nullptr);
423 3x vt_ = std::exchange(other.vt_, nullptr);
424 3x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
425 3x storage_ = std::exchange(other.storage_, nullptr);
426 3x active_ops_ = std::exchange(other.active_ops_, nullptr);
427 }
428 4x return *this;
429 }
430
431 template<ReadSource S>
432 requires (!std::same_as<std::decay_t<S>, any_read_source>)
433 6x any_read_source::any_read_source(S s)
434 6x : vt_(&vtable_for_impl<S>::value)
435 {
436 struct guard {
437 any_read_source* self;
438 bool committed = false;
439 6x ~guard() {
440 6x if(!committed && self->storage_) {
441 self->vt_->destroy(self->source_);
442 ::operator delete(self->storage_);
443 self->storage_ = nullptr;
444 self->source_ = nullptr;
445 }
446 6x }
447 6x } g{this};
448
449 6x storage_ = ::operator new(sizeof(S));
450 6x source_ = ::new(storage_) S(std::move(s));
451
452 // Preallocate the awaitable storage
453 6x cached_awaitable_ = ::operator new(vt_->awaitable_size);
454
455 6x g.committed = true;
456 6x }
457
458 template<ReadSource S>
459 135x any_read_source::any_read_source(S* s)
460 135x : source_(s)
461 135x , vt_(&vtable_for_impl<S>::value)
462 {
463 // Preallocate the awaitable storage
464 135x cached_awaitable_ = ::operator new(vt_->awaitable_size);
465 135x }
466
467 template<MutableBufferSequence MB>
468 auto
469 54x any_read_source::read_some(MB buffers)
470 {
471 struct awaitable
472 {
473 any_read_source* self_;
474 mutable_buffer_array<detail::max_iovec_> ba_;
475
476 awaitable(any_read_source* self, MB const& buffers)
477 : self_(self)
478 , ba_(buffers)
479 {
480 }
481
482 bool
483 await_ready() const noexcept
484 {
485 return ba_.to_span().empty();
486 }
487
488 std::coroutine_handle<>
489 await_suspend(std::coroutine_handle<> h, io_env const* env)
490 {
491 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
492 self_->source_,
493 self_->cached_awaitable_,
494 ba_.to_span());
495
496 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
497 return h;
498
499 return self_->active_ops_->await_suspend(
500 self_->cached_awaitable_, h, env);
501 }
502
503 io_result<std::size_t>
504 await_resume()
505 {
506 if(ba_.to_span().empty())
507 return {{}, 0};
508
509 struct guard {
510 any_read_source* self;
511 ~guard() {
512 self->active_ops_->destroy(self->cached_awaitable_);
513 self->active_ops_ = nullptr;
514 }
515 } g{self_};
516 return self_->active_ops_->await_resume(
517 self_->cached_awaitable_);
518 }
519 };
520 54x return awaitable(this, buffers);
521 }
522
523 inline auto
524 116x any_read_source::read_(std::span<mutable_buffer const> buffers)
525 {
526 struct awaitable
527 {
528 any_read_source* self_;
529 std::span<mutable_buffer const> buffers_;
530
531 bool
532 116x await_ready() const noexcept
533 {
534 116x return false;
535 }
536
537 std::coroutine_handle<>
538 116x await_suspend(std::coroutine_handle<> h, io_env const* env)
539 {
540 232x self_->active_ops_ = self_->vt_->construct_read_awaitable(
541 116x self_->source_,
542 116x self_->cached_awaitable_,
543 buffers_);
544
545 116x if(self_->active_ops_->await_ready(self_->cached_awaitable_))
546 116x return h;
547
548 return self_->active_ops_->await_suspend(
549 self_->cached_awaitable_, h, env);
550 }
551
552 io_result<std::size_t>
553 116x await_resume()
554 {
555 struct guard {
556 any_read_source* self;
557 116x ~guard() {
558 116x self->active_ops_->destroy(self->cached_awaitable_);
559 116x self->active_ops_ = nullptr;
560 116x }
561 116x } g{self_};
562 116x return self_->active_ops_->await_resume(
563 200x self_->cached_awaitable_);
564 116x }
565 };
566 116x return awaitable{this, buffers};
567 }
568
569 template<MutableBufferSequence MB>
570 io_task<std::size_t>
571 110x any_read_source::read(MB buffers)
572 {
573 buffer_param bp(buffers);
574 std::size_t total = 0;
575
576 for(;;)
577 {
578 auto bufs = bp.data();
579 if(bufs.empty())
580 break;
581
582 auto [ec, n] = co_await read_(bufs);
583 total += n;
584 if(ec)
585 co_return {ec, total};
586 bp.consume(n);
587 }
588
589 co_return {{}, total};
590 220x }
591
592 } // namespace capy
593 } // namespace boost
594
595 #endif
596