1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_task.hpp>
22  
#include <boost/capy/io_task.hpp>
23  

23  

24  
#include <concepts>
24  
#include <concepts>
25  
#include <coroutine>
25  
#include <coroutine>
26  
#include <cstddef>
26  
#include <cstddef>
27  
#include <exception>
27  
#include <exception>
28  
#include <new>
28  
#include <new>
29  
#include <span>
29  
#include <span>
30  
#include <stop_token>
30  
#include <stop_token>
31  
#include <system_error>
31  
#include <system_error>
32  
#include <utility>
32  
#include <utility>
33  

33  

34  
namespace boost {
34  
namespace boost {
35  
namespace capy {
35  
namespace capy {
36  

36  

37  
/** Type-erased wrapper for any ReadSource.
37  
/** Type-erased wrapper for any ReadSource.
38  

38  

39  
    This class provides type erasure for any type satisfying the
39  
    This class provides type erasure for any type satisfying the
40  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    @ref ReadSource concept, enabling runtime polymorphism for
41  
    source read operations. It uses cached awaitable storage to achieve
41  
    source read operations. It uses cached awaitable storage to achieve
42  
    zero steady-state allocation after construction.
42  
    zero steady-state allocation after construction.
43  

43  

44  
    The wrapper supports two construction modes:
44  
    The wrapper supports two construction modes:
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
      allocates storage and owns the source.
46  
      allocates storage and owns the source.
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
      pointed-to source must outlive this wrapper.
48  
      pointed-to source must outlive this wrapper.
49  

49  

50  
    @par Awaitable Preallocation
50  
    @par Awaitable Preallocation
51  
    The constructor preallocates storage for the type-erased awaitable.
51  
    The constructor preallocates storage for the type-erased awaitable.
52  
    This reserves all virtual address space at server startup
52  
    This reserves all virtual address space at server startup
53  
    so memory usage can be measured up front, rather than
53  
    so memory usage can be measured up front, rather than
54  
    allocating piecemeal as traffic arrives.
54  
    allocating piecemeal as traffic arrives.
55  

55  

56  
    @par Immediate Completion
56  
    @par Immediate Completion
57  
    Operations complete immediately without suspending when the
57  
    Operations complete immediately without suspending when the
58  
    buffer sequence is empty, or when the underlying source's
58  
    buffer sequence is empty, or when the underlying source's
59  
    awaitable reports readiness via `await_ready`.
59  
    awaitable reports readiness via `await_ready`.
60  

60  

61  
    @par Thread Safety
61  
    @par Thread Safety
62  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    are undefined behavior.
63  
    are undefined behavior.
64  

64  

65  
    @par Example
65  
    @par Example
66  
    @code
66  
    @code
67  
    // Owning - takes ownership of the source
67  
    // Owning - takes ownership of the source
68  
    any_read_source rs(some_source{args...});
68  
    any_read_source rs(some_source{args...});
69  

69  

70  
    // Reference - wraps without ownership
70  
    // Reference - wraps without ownership
71  
    some_source source;
71  
    some_source source;
72  
    any_read_source rs(&source);
72  
    any_read_source rs(&source);
73  

73  

74  
    mutable_buffer buf(data, size);
74  
    mutable_buffer buf(data, size);
75  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76  
    @endcode
76  
    @endcode
77  

77  

78  
    @see any_read_stream, ReadSource
78  
    @see any_read_stream, ReadSource
79  
*/
79  
*/
80  
class any_read_source
80  
class any_read_source
81  
{
81  
{
82  
    struct vtable;
82  
    struct vtable;
83  
    struct awaitable_ops;
83  
    struct awaitable_ops;
84  

84  

85  
    template<ReadSource S>
85  
    template<ReadSource S>
86  
    struct vtable_for_impl;
86  
    struct vtable_for_impl;
87  

87  

88  
    void* source_ = nullptr;
88  
    void* source_ = nullptr;
89  
    vtable const* vt_ = nullptr;
89  
    vtable const* vt_ = nullptr;
90  
    void* cached_awaitable_ = nullptr;
90  
    void* cached_awaitable_ = nullptr;
91  
    void* storage_ = nullptr;
91  
    void* storage_ = nullptr;
92  
    awaitable_ops const* active_ops_ = nullptr;
92  
    awaitable_ops const* active_ops_ = nullptr;
93  

93  

94  
public:
94  
public:
95  
    /** Destructor.
95  
    /** Destructor.
96  

96  

97  
        Destroys the owned source (if any) and releases the cached
97  
        Destroys the owned source (if any) and releases the cached
98  
        awaitable storage.
98  
        awaitable storage.
99  
    */
99  
    */
100  
    ~any_read_source();
100  
    ~any_read_source();
101  

101  

102  
    /** Construct a default instance.
102  
    /** Construct a default instance.
103  

103  

104  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        Constructs an empty wrapper. Operations on a default-constructed
105  
        wrapper result in undefined behavior.
105  
        wrapper result in undefined behavior.
106  
    */
106  
    */
107  
    any_read_source() = default;
107  
    any_read_source() = default;
108  

108  

109  
    /** Non-copyable.
109  
    /** Non-copyable.
110  

110  

111  
        The awaitable cache is per-instance and cannot be shared.
111  
        The awaitable cache is per-instance and cannot be shared.
112  
    */
112  
    */
113  
    any_read_source(any_read_source const&) = delete;
113  
    any_read_source(any_read_source const&) = delete;
114  
    any_read_source& operator=(any_read_source const&) = delete;
114  
    any_read_source& operator=(any_read_source const&) = delete;
115  

115  

116  
    /** Construct by moving.
116  
    /** Construct by moving.
117  

117  

118  
        Transfers ownership of the wrapped source (if owned) and
118  
        Transfers ownership of the wrapped source (if owned) and
119  
        cached awaitable storage from `other`. After the move, `other` is
119  
        cached awaitable storage from `other`. After the move, `other` is
120  
        in a default-constructed state.
120  
        in a default-constructed state.
121  

121  

122  
        @param other The wrapper to move from.
122  
        @param other The wrapper to move from.
123  
    */
123  
    */
124  
    any_read_source(any_read_source&& other) noexcept
124  
    any_read_source(any_read_source&& other) noexcept
125  
        : source_(std::exchange(other.source_, nullptr))
125  
        : source_(std::exchange(other.source_, nullptr))
126  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , vt_(std::exchange(other.vt_, nullptr))
127  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , storage_(std::exchange(other.storage_, nullptr))
129  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
129  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
130  
    {
130  
    {
131  
    }
131  
    }
132  

132  

133  
    /** Assign by moving.
133  
    /** Assign by moving.
134  

134  

135  
        Destroys any owned source and releases existing resources,
135  
        Destroys any owned source and releases existing resources,
136  
        then transfers ownership from `other`.
136  
        then transfers ownership from `other`.
137  

137  

138  
        @param other The wrapper to move from.
138  
        @param other The wrapper to move from.
139  
        @return Reference to this wrapper.
139  
        @return Reference to this wrapper.
140  
    */
140  
    */
141  
    any_read_source&
141  
    any_read_source&
142  
    operator=(any_read_source&& other) noexcept;
142  
    operator=(any_read_source&& other) noexcept;
143  

143  

144  
    /** Construct by taking ownership of a ReadSource.
144  
    /** Construct by taking ownership of a ReadSource.
145  

145  

146  
        Allocates storage and moves the source into this wrapper.
146  
        Allocates storage and moves the source into this wrapper.
147  
        The wrapper owns the source and will destroy it.
147  
        The wrapper owns the source and will destroy it.
148  

148  

149  
        @param s The source to take ownership of.
149  
        @param s The source to take ownership of.
150  
    */
150  
    */
151  
    template<ReadSource S>
151  
    template<ReadSource S>
152  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
152  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
153  
    any_read_source(S s);
153  
    any_read_source(S s);
154  

154  

155  
    /** Construct by wrapping a ReadSource without ownership.
155  
    /** Construct by wrapping a ReadSource without ownership.
156  

156  

157  
        Wraps the given source by pointer. The source must remain
157  
        Wraps the given source by pointer. The source must remain
158  
        valid for the lifetime of this wrapper.
158  
        valid for the lifetime of this wrapper.
159  

159  

160  
        @param s Pointer to the source to wrap.
160  
        @param s Pointer to the source to wrap.
161  
    */
161  
    */
162  
    template<ReadSource S>
162  
    template<ReadSource S>
163  
    any_read_source(S* s);
163  
    any_read_source(S* s);
164  

164  

165  
    /** Check if the wrapper contains a valid source.
165  
    /** Check if the wrapper contains a valid source.
166  

166  

167  
        @return `true` if wrapping a source, `false` if default-constructed
167  
        @return `true` if wrapping a source, `false` if default-constructed
168  
            or moved-from.
168  
            or moved-from.
169  
    */
169  
    */
170  
    bool
170  
    bool
171  
    has_value() const noexcept
171  
    has_value() const noexcept
172  
    {
172  
    {
173  
        return source_ != nullptr;
173  
        return source_ != nullptr;
174  
    }
174  
    }
175  

175  

176  
    /** Check if the wrapper contains a valid source.
176  
    /** Check if the wrapper contains a valid source.
177  

177  

178  
        @return `true` if wrapping a source, `false` if default-constructed
178  
        @return `true` if wrapping a source, `false` if default-constructed
179  
            or moved-from.
179  
            or moved-from.
180  
    */
180  
    */
181  
    explicit
181  
    explicit
182  
    operator bool() const noexcept
182  
    operator bool() const noexcept
183  
    {
183  
    {
184  
        return has_value();
184  
        return has_value();
185  
    }
185  
    }
186  

186  

187  
    /** Initiate a partial read operation.
187  
    /** Initiate a partial read operation.
188  

188  

189  
        Reads one or more bytes into the provided buffer sequence.
189  
        Reads one or more bytes into the provided buffer sequence.
190  
        May fill less than the full sequence.
190  
        May fill less than the full sequence.
191  

191  

192  
        @param buffers The buffer sequence to read into.
192  
        @param buffers The buffer sequence to read into.
193  

193  

194  
        @return An awaitable yielding `(error_code,std::size_t)`.
194  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  

195  

196  
        @par Immediate Completion
196  
        @par Immediate Completion
197  
        The operation completes immediately without suspending
197  
        The operation completes immediately without suspending
198  
        the calling coroutine when:
198  
        the calling coroutine when:
199  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
199  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200  
        @li The underlying source's awaitable reports immediate
200  
        @li The underlying source's awaitable reports immediate
201  
            readiness via `await_ready`.
201  
            readiness via `await_ready`.
202  

202  

203  
        @note This is a partial operation and may not process the
203  
        @note This is a partial operation and may not process the
204  
        entire buffer sequence. Use @ref read for guaranteed
204  
        entire buffer sequence. Use @ref read for guaranteed
205  
        complete transfer.
205  
        complete transfer.
206  

206  

207  
        @par Preconditions
207  
        @par Preconditions
208  
        The wrapper must contain a valid source (`has_value() == true`).
208  
        The wrapper must contain a valid source (`has_value() == true`).
209  
        The caller must not call this function again after a prior
209  
        The caller must not call this function again after a prior
210  
        call returned an error (including EOF).
210  
        call returned an error (including EOF).
211  
    */
211  
    */
212  
    template<MutableBufferSequence MB>
212  
    template<MutableBufferSequence MB>
213  
    auto
213  
    auto
214  
    read_some(MB buffers);
214  
    read_some(MB buffers);
215  

215  

216  
    /** Initiate a complete read operation.
216  
    /** Initiate a complete read operation.
217  

217  

218  
        Reads data into the provided buffer sequence by forwarding
218  
        Reads data into the provided buffer sequence by forwarding
219  
        to the underlying source's `read` operation. Large buffer
219  
        to the underlying source's `read` operation. Large buffer
220  
        sequences are processed in windows, with each window
220  
        sequences are processed in windows, with each window
221  
        forwarded as a separate `read` call to the underlying source.
221  
        forwarded as a separate `read` call to the underlying source.
222  
        The operation completes when the entire buffer sequence is
222  
        The operation completes when the entire buffer sequence is
223  
        filled, end-of-file is reached, or an error occurs.
223  
        filled, end-of-file is reached, or an error occurs.
224  

224  

225  
        @param buffers The buffer sequence to read into.
225  
        @param buffers The buffer sequence to read into.
226  

226  

227  
        @return An awaitable yielding `(error_code,std::size_t)`.
227  
        @return An awaitable yielding `(error_code,std::size_t)`.
228  

228  

229  
        @par Immediate Completion
229  
        @par Immediate Completion
230  
        The operation completes immediately without suspending
230  
        The operation completes immediately without suspending
231  
        the calling coroutine when:
231  
        the calling coroutine when:
232  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
232  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
233  
        @li The underlying source's `read` awaitable reports
233  
        @li The underlying source's `read` awaitable reports
234  
            immediate readiness via `await_ready`.
234  
            immediate readiness via `await_ready`.
235  

235  

236  
        @par Postconditions
236  
        @par Postconditions
237  
        Exactly one of the following is true on return:
237  
        Exactly one of the following is true on return:
238  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
239  
            The entire buffer was filled.
239  
            The entire buffer was filled.
240  
        @li **End-of-stream or Error**: `ec` and `n` indicates
240  
        @li **End-of-stream or Error**: `ec` and `n` indicates
241  
            the number of bytes transferred before the failure.
241  
            the number of bytes transferred before the failure.
242  

242  

243  
        @par Preconditions
243  
        @par Preconditions
244  
        The wrapper must contain a valid source (`has_value() == true`).
244  
        The wrapper must contain a valid source (`has_value() == true`).
245  
        The caller must not call this function again after a prior
245  
        The caller must not call this function again after a prior
246  
        call returned an error (including EOF).
246  
        call returned an error (including EOF).
247  
    */
247  
    */
248  
    template<MutableBufferSequence MB>
248  
    template<MutableBufferSequence MB>
249  
    io_task<std::size_t>
249  
    io_task<std::size_t>
250  
    read(MB buffers);
250  
    read(MB buffers);
251  

251  

252  
protected:
252  
protected:
253  
    /** Rebind to a new source after move.
253  
    /** Rebind to a new source after move.
254  

254  

255  
        Updates the internal pointer to reference a new source object.
255  
        Updates the internal pointer to reference a new source object.
256  
        Used by owning wrappers after move assignment when the owned
256  
        Used by owning wrappers after move assignment when the owned
257  
        object has moved to a new location.
257  
        object has moved to a new location.
258  

258  

259  
        @param new_source The new source to bind to. Must be the same
259  
        @param new_source The new source to bind to. Must be the same
260  
            type as the original source.
260  
            type as the original source.
261  

261  

262  
        @note Terminates if called with a source of different type
262  
        @note Terminates if called with a source of different type
263  
            than the original.
263  
            than the original.
264  
    */
264  
    */
265  
    template<ReadSource S>
265  
    template<ReadSource S>
266  
    void
266  
    void
267  
    rebind(S& new_source) noexcept
267  
    rebind(S& new_source) noexcept
268  
    {
268  
    {
269  
        if(vt_ != &vtable_for_impl<S>::value)
269  
        if(vt_ != &vtable_for_impl<S>::value)
270  
            std::terminate();
270  
            std::terminate();
271  
        source_ = &new_source;
271  
        source_ = &new_source;
272  
    }
272  
    }
273  

273  

274  
private:
274  
private:
275  
    auto
275  
    auto
276  
    read_(std::span<mutable_buffer const> buffers);
276  
    read_(std::span<mutable_buffer const> buffers);
277  
};
277  
};
278  

278  

279  
// ordered by call sequence for cache line coherence
279  
// ordered by call sequence for cache line coherence
280  
struct any_read_source::awaitable_ops
280  
struct any_read_source::awaitable_ops
281  
{
281  
{
282  
    bool (*await_ready)(void*);
282  
    bool (*await_ready)(void*);
283  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
283  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
284  
    io_result<std::size_t> (*await_resume)(void*);
284  
    io_result<std::size_t> (*await_resume)(void*);
285  
    void (*destroy)(void*) noexcept;
285  
    void (*destroy)(void*) noexcept;
286  
};
286  
};
287  

287  

288  
// ordered by call frequency for cache line coherence
288  
// ordered by call frequency for cache line coherence
289  
struct any_read_source::vtable
289  
struct any_read_source::vtable
290  
{
290  
{
291  
    awaitable_ops const* (*construct_read_some_awaitable)(
291  
    awaitable_ops const* (*construct_read_some_awaitable)(
292  
        void* source,
292  
        void* source,
293  
        void* storage,
293  
        void* storage,
294  
        std::span<mutable_buffer const> buffers);
294  
        std::span<mutable_buffer const> buffers);
295  
    awaitable_ops const* (*construct_read_awaitable)(
295  
    awaitable_ops const* (*construct_read_awaitable)(
296  
        void* source,
296  
        void* source,
297  
        void* storage,
297  
        void* storage,
298  
        std::span<mutable_buffer const> buffers);
298  
        std::span<mutable_buffer const> buffers);
299  
    std::size_t awaitable_size;
299  
    std::size_t awaitable_size;
300  
    std::size_t awaitable_align;
300  
    std::size_t awaitable_align;
301  
    void (*destroy)(void*) noexcept;
301  
    void (*destroy)(void*) noexcept;
302  
};
302  
};
303  

303  

304  
template<ReadSource S>
304  
template<ReadSource S>
305  
struct any_read_source::vtable_for_impl
305  
struct any_read_source::vtable_for_impl
306  
{
306  
{
307  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
307  
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
308  
        std::span<mutable_buffer const>{}));
308  
        std::span<mutable_buffer const>{}));
309  
    using ReadAwaitable = decltype(std::declval<S&>().read(
309  
    using ReadAwaitable = decltype(std::declval<S&>().read(
310  
        std::span<mutable_buffer const>{}));
310  
        std::span<mutable_buffer const>{}));
311  

311  

312  
    static void
312  
    static void
313  
    do_destroy_impl(void* source) noexcept
313  
    do_destroy_impl(void* source) noexcept
314  
    {
314  
    {
315  
        static_cast<S*>(source)->~S();
315  
        static_cast<S*>(source)->~S();
316  
    }
316  
    }
317  

317  

318  
    static awaitable_ops const*
318  
    static awaitable_ops const*
319  
    construct_read_some_awaitable_impl(
319  
    construct_read_some_awaitable_impl(
320  
        void* source,
320  
        void* source,
321  
        void* storage,
321  
        void* storage,
322  
        std::span<mutable_buffer const> buffers)
322  
        std::span<mutable_buffer const> buffers)
323  
    {
323  
    {
324  
        auto& s = *static_cast<S*>(source);
324  
        auto& s = *static_cast<S*>(source);
325  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
325  
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
326  

326  

327  
        static constexpr awaitable_ops ops = {
327  
        static constexpr awaitable_ops ops = {
328  
            +[](void* p) {
328  
            +[](void* p) {
329  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
329  
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
330  
            },
330  
            },
331  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
331  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
332  
                return detail::call_await_suspend(
332  
                return detail::call_await_suspend(
333  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
333  
                    static_cast<ReadSomeAwaitable*>(p), h, env);
334  
            },
334  
            },
335  
            +[](void* p) {
335  
            +[](void* p) {
336  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
336  
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
337  
            },
337  
            },
338  
            +[](void* p) noexcept {
338  
            +[](void* p) noexcept {
339  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
339  
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
340  
            }
340  
            }
341  
        };
341  
        };
342  
        return &ops;
342  
        return &ops;
343  
    }
343  
    }
344  

344  

345  
    static awaitable_ops const*
345  
    static awaitable_ops const*
346  
    construct_read_awaitable_impl(
346  
    construct_read_awaitable_impl(
347  
        void* source,
347  
        void* source,
348  
        void* storage,
348  
        void* storage,
349  
        std::span<mutable_buffer const> buffers)
349  
        std::span<mutable_buffer const> buffers)
350  
    {
350  
    {
351  
        auto& s = *static_cast<S*>(source);
351  
        auto& s = *static_cast<S*>(source);
352  
        ::new(storage) ReadAwaitable(s.read(buffers));
352  
        ::new(storage) ReadAwaitable(s.read(buffers));
353  

353  

354  
        static constexpr awaitable_ops ops = {
354  
        static constexpr awaitable_ops ops = {
355  
            +[](void* p) {
355  
            +[](void* p) {
356  
                return static_cast<ReadAwaitable*>(p)->await_ready();
356  
                return static_cast<ReadAwaitable*>(p)->await_ready();
357  
            },
357  
            },
358  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
358  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
359  
                return detail::call_await_suspend(
359  
                return detail::call_await_suspend(
360  
                    static_cast<ReadAwaitable*>(p), h, env);
360  
                    static_cast<ReadAwaitable*>(p), h, env);
361  
            },
361  
            },
362  
            +[](void* p) {
362  
            +[](void* p) {
363  
                return static_cast<ReadAwaitable*>(p)->await_resume();
363  
                return static_cast<ReadAwaitable*>(p)->await_resume();
364  
            },
364  
            },
365  
            +[](void* p) noexcept {
365  
            +[](void* p) noexcept {
366  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
366  
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
367  
            }
367  
            }
368  
        };
368  
        };
369  
        return &ops;
369  
        return &ops;
370  
    }
370  
    }
371  

371  

372  
    static constexpr std::size_t max_awaitable_size =
372  
    static constexpr std::size_t max_awaitable_size =
373  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
373  
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
374  
            ? sizeof(ReadSomeAwaitable)
374  
            ? sizeof(ReadSomeAwaitable)
375  
            : sizeof(ReadAwaitable);
375  
            : sizeof(ReadAwaitable);
376  
    static constexpr std::size_t max_awaitable_align =
376  
    static constexpr std::size_t max_awaitable_align =
377  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
377  
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
378  
            ? alignof(ReadSomeAwaitable)
378  
            ? alignof(ReadSomeAwaitable)
379  
            : alignof(ReadAwaitable);
379  
            : alignof(ReadAwaitable);
380  

380  

381  
    static constexpr vtable value = {
381  
    static constexpr vtable value = {
382  
        &construct_read_some_awaitable_impl,
382  
        &construct_read_some_awaitable_impl,
383  
        &construct_read_awaitable_impl,
383  
        &construct_read_awaitable_impl,
384  
        max_awaitable_size,
384  
        max_awaitable_size,
385  
        max_awaitable_align,
385  
        max_awaitable_align,
386  
        &do_destroy_impl
386  
        &do_destroy_impl
387  
    };
387  
    };
388  
};
388  
};
389  

389  

390  
inline
390  
inline
391  
any_read_source::~any_read_source()
391  
any_read_source::~any_read_source()
392  
{
392  
{
393  
    if(storage_)
393  
    if(storage_)
394  
    {
394  
    {
395  
        vt_->destroy(source_);
395  
        vt_->destroy(source_);
396  
        ::operator delete(storage_);
396  
        ::operator delete(storage_);
397  
    }
397  
    }
398  
    if(cached_awaitable_)
398  
    if(cached_awaitable_)
399  
    {
399  
    {
400  
        if(active_ops_)
400  
        if(active_ops_)
401  
            active_ops_->destroy(cached_awaitable_);
401  
            active_ops_->destroy(cached_awaitable_);
402  
        ::operator delete(cached_awaitable_);
402  
        ::operator delete(cached_awaitable_);
403  
    }
403  
    }
404  
}
404  
}
405  

405  

406  
inline any_read_source&
406  
inline any_read_source&
407  
any_read_source::operator=(any_read_source&& other) noexcept
407  
any_read_source::operator=(any_read_source&& other) noexcept
408  
{
408  
{
409  
    if(this != &other)
409  
    if(this != &other)
410  
    {
410  
    {
411  
        if(storage_)
411  
        if(storage_)
412  
        {
412  
        {
413  
            vt_->destroy(source_);
413  
            vt_->destroy(source_);
414  
            ::operator delete(storage_);
414  
            ::operator delete(storage_);
415  
        }
415  
        }
416  
        if(cached_awaitable_)
416  
        if(cached_awaitable_)
417  
        {
417  
        {
418  
            if(active_ops_)
418  
            if(active_ops_)
419  
                active_ops_->destroy(cached_awaitable_);
419  
                active_ops_->destroy(cached_awaitable_);
420  
            ::operator delete(cached_awaitable_);
420  
            ::operator delete(cached_awaitable_);
421  
        }
421  
        }
422  
        source_ = std::exchange(other.source_, nullptr);
422  
        source_ = std::exchange(other.source_, nullptr);
423  
        vt_ = std::exchange(other.vt_, nullptr);
423  
        vt_ = std::exchange(other.vt_, nullptr);
424  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
424  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
425  
        storage_ = std::exchange(other.storage_, nullptr);
425  
        storage_ = std::exchange(other.storage_, nullptr);
426  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
426  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
427  
    }
427  
    }
428  
    return *this;
428  
    return *this;
429  
}
429  
}
430  

430  

431  
template<ReadSource S>
431  
template<ReadSource S>
432  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
432  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
433  
any_read_source::any_read_source(S s)
433  
any_read_source::any_read_source(S s)
434  
    : vt_(&vtable_for_impl<S>::value)
434  
    : vt_(&vtable_for_impl<S>::value)
435  
{
435  
{
436  
    struct guard {
436  
    struct guard {
437  
        any_read_source* self;
437  
        any_read_source* self;
438  
        bool committed = false;
438  
        bool committed = false;
439  
        ~guard() {
439  
        ~guard() {
440  
            if(!committed && self->storage_) {
440  
            if(!committed && self->storage_) {
441  
                self->vt_->destroy(self->source_);
441  
                self->vt_->destroy(self->source_);
442  
                ::operator delete(self->storage_);
442  
                ::operator delete(self->storage_);
443  
                self->storage_ = nullptr;
443  
                self->storage_ = nullptr;
444  
                self->source_ = nullptr;
444  
                self->source_ = nullptr;
445  
            }
445  
            }
446  
        }
446  
        }
447  
    } g{this};
447  
    } g{this};
448  

448  

449  
    storage_ = ::operator new(sizeof(S));
449  
    storage_ = ::operator new(sizeof(S));
450  
    source_ = ::new(storage_) S(std::move(s));
450  
    source_ = ::new(storage_) S(std::move(s));
451  

451  

452  
    // Preallocate the awaitable storage
452  
    // Preallocate the awaitable storage
453  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
453  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
454  

454  

455  
    g.committed = true;
455  
    g.committed = true;
456  
}
456  
}
457  

457  

458  
template<ReadSource S>
458  
template<ReadSource S>
459  
any_read_source::any_read_source(S* s)
459  
any_read_source::any_read_source(S* s)
460  
    : source_(s)
460  
    : source_(s)
461  
    , vt_(&vtable_for_impl<S>::value)
461  
    , vt_(&vtable_for_impl<S>::value)
462  
{
462  
{
463  
    // Preallocate the awaitable storage
463  
    // Preallocate the awaitable storage
464  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
464  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
465  
}
465  
}
466  

466  

467  
template<MutableBufferSequence MB>
467  
template<MutableBufferSequence MB>
468  
auto
468  
auto
469  
any_read_source::read_some(MB buffers)
469  
any_read_source::read_some(MB buffers)
470  
{
470  
{
471  
    struct awaitable
471  
    struct awaitable
472  
    {
472  
    {
473  
        any_read_source* self_;
473  
        any_read_source* self_;
474  
        mutable_buffer_array<detail::max_iovec_> ba_;
474  
        mutable_buffer_array<detail::max_iovec_> ba_;
475  

475  

476  
        awaitable(any_read_source* self, MB const& buffers)
476  
        awaitable(any_read_source* self, MB const& buffers)
477  
            : self_(self)
477  
            : self_(self)
478  
            , ba_(buffers)
478  
            , ba_(buffers)
479  
        {
479  
        {
480  
        }
480  
        }
481  

481  

482  
        bool
482  
        bool
483  
        await_ready() const noexcept
483  
        await_ready() const noexcept
484  
        {
484  
        {
485  
            return ba_.to_span().empty();
485  
            return ba_.to_span().empty();
486  
        }
486  
        }
487  

487  

488  
        std::coroutine_handle<>
488  
        std::coroutine_handle<>
489  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
489  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
490  
        {
490  
        {
491  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
491  
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
492  
                self_->source_,
492  
                self_->source_,
493  
                self_->cached_awaitable_,
493  
                self_->cached_awaitable_,
494  
                ba_.to_span());
494  
                ba_.to_span());
495  

495  

496  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
496  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
497  
                return h;
497  
                return h;
498  

498  

499  
            return self_->active_ops_->await_suspend(
499  
            return self_->active_ops_->await_suspend(
500  
                self_->cached_awaitable_, h, env);
500  
                self_->cached_awaitable_, h, env);
501  
        }
501  
        }
502  

502  

503  
        io_result<std::size_t>
503  
        io_result<std::size_t>
504  
        await_resume()
504  
        await_resume()
505  
        {
505  
        {
506  
            if(ba_.to_span().empty())
506  
            if(ba_.to_span().empty())
507  
                return {{}, 0};
507  
                return {{}, 0};
508  

508  

509  
            struct guard {
509  
            struct guard {
510  
                any_read_source* self;
510  
                any_read_source* self;
511  
                ~guard() {
511  
                ~guard() {
512  
                    self->active_ops_->destroy(self->cached_awaitable_);
512  
                    self->active_ops_->destroy(self->cached_awaitable_);
513  
                    self->active_ops_ = nullptr;
513  
                    self->active_ops_ = nullptr;
514  
                }
514  
                }
515  
            } g{self_};
515  
            } g{self_};
516  
            return self_->active_ops_->await_resume(
516  
            return self_->active_ops_->await_resume(
517  
                self_->cached_awaitable_);
517  
                self_->cached_awaitable_);
518  
        }
518  
        }
519  
    };
519  
    };
520  
    return awaitable(this, buffers);
520  
    return awaitable(this, buffers);
521  
}
521  
}
522  

522  

523  
inline auto
523  
inline auto
524  
any_read_source::read_(std::span<mutable_buffer const> buffers)
524  
any_read_source::read_(std::span<mutable_buffer const> buffers)
525  
{
525  
{
526  
    struct awaitable
526  
    struct awaitable
527  
    {
527  
    {
528  
        any_read_source* self_;
528  
        any_read_source* self_;
529  
        std::span<mutable_buffer const> buffers_;
529  
        std::span<mutable_buffer const> buffers_;
530  

530  

531  
        bool
531  
        bool
532  
        await_ready() const noexcept
532  
        await_ready() const noexcept
533  
        {
533  
        {
534  
            return false;
534  
            return false;
535  
        }
535  
        }
536  

536  

537  
        std::coroutine_handle<>
537  
        std::coroutine_handle<>
538  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
538  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
539  
        {
539  
        {
540  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
540  
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
541  
                self_->source_,
541  
                self_->source_,
542  
                self_->cached_awaitable_,
542  
                self_->cached_awaitable_,
543  
                buffers_);
543  
                buffers_);
544  

544  

545  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
545  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
546  
                return h;
546  
                return h;
547  

547  

548  
            return self_->active_ops_->await_suspend(
548  
            return self_->active_ops_->await_suspend(
549  
                self_->cached_awaitable_, h, env);
549  
                self_->cached_awaitable_, h, env);
550  
        }
550  
        }
551  

551  

552  
        io_result<std::size_t>
552  
        io_result<std::size_t>
553  
        await_resume()
553  
        await_resume()
554  
        {
554  
        {
555  
            struct guard {
555  
            struct guard {
556  
                any_read_source* self;
556  
                any_read_source* self;
557  
                ~guard() {
557  
                ~guard() {
558  
                    self->active_ops_->destroy(self->cached_awaitable_);
558  
                    self->active_ops_->destroy(self->cached_awaitable_);
559  
                    self->active_ops_ = nullptr;
559  
                    self->active_ops_ = nullptr;
560  
                }
560  
                }
561  
            } g{self_};
561  
            } g{self_};
562  
            return self_->active_ops_->await_resume(
562  
            return self_->active_ops_->await_resume(
563  
                self_->cached_awaitable_);
563  
                self_->cached_awaitable_);
564  
        }
564  
        }
565  
    };
565  
    };
566  
    return awaitable{this, buffers};
566  
    return awaitable{this, buffers};
567  
}
567  
}
568  

568  

569  
template<MutableBufferSequence MB>
569  
template<MutableBufferSequence MB>
570  
io_task<std::size_t>
570  
io_task<std::size_t>
571  
any_read_source::read(MB buffers)
571  
any_read_source::read(MB buffers)
572  
{
572  
{
573  
    buffer_param bp(buffers);
573  
    buffer_param bp(buffers);
574  
    std::size_t total = 0;
574  
    std::size_t total = 0;
575  

575  

576  
    for(;;)
576  
    for(;;)
577  
    {
577  
    {
578  
        auto bufs = bp.data();
578  
        auto bufs = bp.data();
579  
        if(bufs.empty())
579  
        if(bufs.empty())
580  
            break;
580  
            break;
581  

581  

582  
        auto [ec, n] = co_await read_(bufs);
582  
        auto [ec, n] = co_await read_(bufs);
583  
        total += n;
583  
        total += n;
584  
        if(ec)
584  
        if(ec)
585  
            co_return {ec, total};
585  
            co_return {ec, total};
586  
        bp.consume(n);
586  
        bp.consume(n);
587  
    }
587  
    }
588  

588  

589  
    co_return {{}, total};
589  
    co_return {{}, total};
590  
}
590  
}
591  

591  

592  
} // namespace capy
592  
} // namespace capy
593  
} // namespace boost
593  
} // namespace boost
594  

594  

595  
#endif
595  
#endif