1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  

21  

22  
#include <concepts>
22  
#include <concepts>
23  
#include <coroutine>
23  
#include <coroutine>
24  
#include <cstddef>
24  
#include <cstddef>
25  
#include <exception>
25  
#include <exception>
26  
#include <new>
26  
#include <new>
27  
#include <span>
27  
#include <span>
28  
#include <stop_token>
28  
#include <stop_token>
29  
#include <system_error>
29  
#include <system_error>
30  
#include <utility>
30  
#include <utility>
31  

31  

32  
namespace boost {
32  
namespace boost {
33  
namespace capy {
33  
namespace capy {
34  

34  

35  
/** Type-erased wrapper for any ReadStream.
35  
/** Type-erased wrapper for any ReadStream.
36  

36  

37  
    This class provides type erasure for any type satisfying the
37  
    This class provides type erasure for any type satisfying the
38  
    @ref ReadStream concept, enabling runtime polymorphism for
38  
    @ref ReadStream concept, enabling runtime polymorphism for
39  
    read operations. It uses cached awaitable storage to achieve
39  
    read operations. It uses cached awaitable storage to achieve
40  
    zero steady-state allocation after construction.
40  
    zero steady-state allocation after construction.
41  

41  

42  
    The wrapper supports two construction modes:
42  
    The wrapper supports two construction modes:
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
      allocates storage and owns the stream.
44  
      allocates storage and owns the stream.
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
      pointed-to stream must outlive this wrapper.
46  
      pointed-to stream must outlive this wrapper.
47  

47  

48  
    @par Awaitable Preallocation
48  
    @par Awaitable Preallocation
49  
    The constructor preallocates storage for the type-erased awaitable.
49  
    The constructor preallocates storage for the type-erased awaitable.
50  
    This reserves all virtual address space at server startup
50  
    This reserves all virtual address space at server startup
51  
    so memory usage can be measured up front, rather than
51  
    so memory usage can be measured up front, rather than
52  
    allocating piecemeal as traffic arrives.
52  
    allocating piecemeal as traffic arrives.
53  

53  

54  
    @par Immediate Completion
54  
    @par Immediate Completion
55  
    When the underlying stream's awaitable reports ready immediately
55  
    When the underlying stream's awaitable reports ready immediately
56  
    (e.g. buffered data already available), the wrapper skips
56  
    (e.g. buffered data already available), the wrapper skips
57  
    coroutine suspension entirely and returns the result inline.
57  
    coroutine suspension entirely and returns the result inline.
58  

58  

59  
    @par Thread Safety
59  
    @par Thread Safety
60  
    Not thread-safe. Concurrent operations on the same wrapper
60  
    Not thread-safe. Concurrent operations on the same wrapper
61  
    are undefined behavior.
61  
    are undefined behavior.
62  

62  

63  
    @par Example
63  
    @par Example
64  
    @code
64  
    @code
65  
    // Owning - takes ownership of the stream
65  
    // Owning - takes ownership of the stream
66  
    any_read_stream stream(socket{ioc});
66  
    any_read_stream stream(socket{ioc});
67  

67  

68  
    // Reference - wraps without ownership
68  
    // Reference - wraps without ownership
69  
    socket sock(ioc);
69  
    socket sock(ioc);
70  
    any_read_stream stream(&sock);
70  
    any_read_stream stream(&sock);
71  

71  

72  
    mutable_buffer buf(data, size);
72  
    mutable_buffer buf(data, size);
73  
    auto [ec, n] = co_await stream.read_some(buf);
73  
    auto [ec, n] = co_await stream.read_some(buf);
74  
    @endcode
74  
    @endcode
75  

75  

76  
    @see any_write_stream, any_stream, ReadStream
76  
    @see any_write_stream, any_stream, ReadStream
77  
*/
77  
*/
78  
class any_read_stream
78  
class any_read_stream
79  
{
79  
{
80  
    struct vtable;
80  
    struct vtable;
81  

81  

82  
    template<ReadStream S>
82  
    template<ReadStream S>
83  
    struct vtable_for_impl;
83  
    struct vtable_for_impl;
84  

84  

85  
    // ordered for cache line coherence
85  
    // ordered for cache line coherence
86  
    void* stream_ = nullptr;
86  
    void* stream_ = nullptr;
87  
    vtable const* vt_ = nullptr;
87  
    vtable const* vt_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
89  
    void* storage_ = nullptr;
89  
    void* storage_ = nullptr;
90  
    bool awaitable_active_ = false;
90  
    bool awaitable_active_ = false;
91  

91  

92  
public:
92  
public:
93  
    /** Destructor.
93  
    /** Destructor.
94  

94  

95  
        Destroys the owned stream (if any) and releases the cached
95  
        Destroys the owned stream (if any) and releases the cached
96  
        awaitable storage.
96  
        awaitable storage.
97  
    */
97  
    */
98  
    ~any_read_stream();
98  
    ~any_read_stream();
99  

99  

100  
    /** Construct a default instance.
100  
    /** Construct a default instance.
101  

101  

102  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        wrapper result in undefined behavior.
103  
        wrapper result in undefined behavior.
104  
    */
104  
    */
105  
    any_read_stream() = default;
105  
    any_read_stream() = default;
106  

106  

107  
    /** Non-copyable.
107  
    /** Non-copyable.
108  

108  

109  
        The awaitable cache is per-instance and cannot be shared.
109  
        The awaitable cache is per-instance and cannot be shared.
110  
    */
110  
    */
111  
    any_read_stream(any_read_stream const&) = delete;
111  
    any_read_stream(any_read_stream const&) = delete;
112  
    any_read_stream& operator=(any_read_stream const&) = delete;
112  
    any_read_stream& operator=(any_read_stream const&) = delete;
113  

113  

114  
    /** Construct by moving.
114  
    /** Construct by moving.
115  

115  

116  
        Transfers ownership of the wrapped stream (if owned) and
116  
        Transfers ownership of the wrapped stream (if owned) and
117  
        cached awaitable storage from `other`. After the move, `other` is
117  
        cached awaitable storage from `other`. After the move, `other` is
118  
        in a default-constructed state.
118  
        in a default-constructed state.
119  

119  

120  
        @param other The wrapper to move from.
120  
        @param other The wrapper to move from.
121  
    */
121  
    */
122  
    any_read_stream(any_read_stream&& other) noexcept
122  
    any_read_stream(any_read_stream&& other) noexcept
123  
        : stream_(std::exchange(other.stream_, nullptr))
123  
        : stream_(std::exchange(other.stream_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
127  
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
128  
    {
128  
    {
129  
    }
129  
    }
130  

130  

131  
    /** Assign by moving.
131  
    /** Assign by moving.
132  

132  

133  
        Destroys any owned stream and releases existing resources,
133  
        Destroys any owned stream and releases existing resources,
134  
        then transfers ownership from `other`.
134  
        then transfers ownership from `other`.
135  

135  

136  
        @param other The wrapper to move from.
136  
        @param other The wrapper to move from.
137  
        @return Reference to this wrapper.
137  
        @return Reference to this wrapper.
138  
    */
138  
    */
139  
    any_read_stream&
139  
    any_read_stream&
140  
    operator=(any_read_stream&& other) noexcept;
140  
    operator=(any_read_stream&& other) noexcept;
141  

141  

142  
    /** Construct by taking ownership of a ReadStream.
142  
    /** Construct by taking ownership of a ReadStream.
143  

143  

144  
        Allocates storage and moves the stream into this wrapper.
144  
        Allocates storage and moves the stream into this wrapper.
145  
        The wrapper owns the stream and will destroy it.
145  
        The wrapper owns the stream and will destroy it.
146  

146  

147  
        @param s The stream to take ownership of.
147  
        @param s The stream to take ownership of.
148  
    */
148  
    */
149  
    template<ReadStream S>
149  
    template<ReadStream S>
150  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151  
    any_read_stream(S s);
151  
    any_read_stream(S s);
152  

152  

153  
    /** Construct by wrapping a ReadStream without ownership.
153  
    /** Construct by wrapping a ReadStream without ownership.
154  

154  

155  
        Wraps the given stream by pointer. The stream must remain
155  
        Wraps the given stream by pointer. The stream must remain
156  
        valid for the lifetime of this wrapper.
156  
        valid for the lifetime of this wrapper.
157  

157  

158  
        @param s Pointer to the stream to wrap.
158  
        @param s Pointer to the stream to wrap.
159  
    */
159  
    */
160  
    template<ReadStream S>
160  
    template<ReadStream S>
161  
    any_read_stream(S* s);
161  
    any_read_stream(S* s);
162  

162  

163  
    /** Check if the wrapper contains a valid stream.
163  
    /** Check if the wrapper contains a valid stream.
164  

164  

165  
        @return `true` if wrapping a stream, `false` if default-constructed
165  
        @return `true` if wrapping a stream, `false` if default-constructed
166  
            or moved-from.
166  
            or moved-from.
167  
    */
167  
    */
168  
    bool
168  
    bool
169  
    has_value() const noexcept
169  
    has_value() const noexcept
170  
    {
170  
    {
171  
        return stream_ != nullptr;
171  
        return stream_ != nullptr;
172  
    }
172  
    }
173  

173  

174  
    /** Check if the wrapper contains a valid stream.
174  
    /** Check if the wrapper contains a valid stream.
175  

175  

176  
        @return `true` if wrapping a stream, `false` if default-constructed
176  
        @return `true` if wrapping a stream, `false` if default-constructed
177  
            or moved-from.
177  
            or moved-from.
178  
    */
178  
    */
179  
    explicit
179  
    explicit
180  
    operator bool() const noexcept
180  
    operator bool() const noexcept
181  
    {
181  
    {
182  
        return has_value();
182  
        return has_value();
183  
    }
183  
    }
184  

184  

185  
    /** Initiate an asynchronous read operation.
185  
    /** Initiate an asynchronous read operation.
186  

186  

187  
        Reads data into the provided buffer sequence. The operation
187  
        Reads data into the provided buffer sequence. The operation
188  
        completes when at least one byte has been read, or an error
188  
        completes when at least one byte has been read, or an error
189  
        occurs.
189  
        occurs.
190  

190  

191  
        @param buffers The buffer sequence to read into. Passed by
191  
        @param buffers The buffer sequence to read into. Passed by
192  
            value to ensure the sequence lives in the coroutine frame
192  
            value to ensure the sequence lives in the coroutine frame
193  
            across suspension points.
193  
            across suspension points.
194  

194  

195  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  
        @return An awaitable yielding `(error_code,std::size_t)`.
196  

196  

197  
        @par Immediate Completion
197  
        @par Immediate Completion
198  
        The operation completes immediately without suspending
198  
        The operation completes immediately without suspending
199  
        the calling coroutine when the underlying stream's
199  
        the calling coroutine when the underlying stream's
200  
        awaitable reports immediate readiness via `await_ready`.
200  
        awaitable reports immediate readiness via `await_ready`.
201  

201  

202  
        @note This is a partial operation and may not process the
202  
        @note This is a partial operation and may not process the
203  
        entire buffer sequence. Use the composed @ref read algorithm
203  
        entire buffer sequence. Use the composed @ref read algorithm
204  
        for guaranteed complete transfer.
204  
        for guaranteed complete transfer.
205  

205  

206  
        @par Preconditions
206  
        @par Preconditions
207  
        The wrapper must contain a valid stream (`has_value() == true`).
207  
        The wrapper must contain a valid stream (`has_value() == true`).
208  
        The caller must not call this function again after a prior
208  
        The caller must not call this function again after a prior
209  
        call returned an error (including EOF).
209  
        call returned an error (including EOF).
210  
    */
210  
    */
211  
    template<MutableBufferSequence MB>
211  
    template<MutableBufferSequence MB>
212  
    auto
212  
    auto
213  
    read_some(MB buffers);
213  
    read_some(MB buffers);
214  

214  

215  
protected:
215  
protected:
216  
    /** Rebind to a new stream after move.
216  
    /** Rebind to a new stream after move.
217  

217  

218  
        Updates the internal pointer to reference a new stream object.
218  
        Updates the internal pointer to reference a new stream object.
219  
        Used by owning wrappers after move assignment when the owned
219  
        Used by owning wrappers after move assignment when the owned
220  
        object has moved to a new location.
220  
        object has moved to a new location.
221  

221  

222  
        @param new_stream The new stream to bind to. Must be the same
222  
        @param new_stream The new stream to bind to. Must be the same
223  
            type as the original stream.
223  
            type as the original stream.
224  

224  

225  
        @note Terminates if called with a stream of different type
225  
        @note Terminates if called with a stream of different type
226  
            than the original.
226  
            than the original.
227  
    */
227  
    */
228  
    template<ReadStream S>
228  
    template<ReadStream S>
229  
    void
229  
    void
230  
    rebind(S& new_stream) noexcept
230  
    rebind(S& new_stream) noexcept
231  
    {
231  
    {
232  
        if(vt_ != &vtable_for_impl<S>::value)
232  
        if(vt_ != &vtable_for_impl<S>::value)
233  
            std::terminate();
233  
            std::terminate();
234  
        stream_ = &new_stream;
234  
        stream_ = &new_stream;
235  
    }
235  
    }
236  
};
236  
};
237  

237  

238  
struct any_read_stream::vtable
238  
struct any_read_stream::vtable
239  
{
239  
{
240  
    // ordered by call frequency for cache line coherence
240  
    // ordered by call frequency for cache line coherence
241  
    void (*construct_awaitable)(
241  
    void (*construct_awaitable)(
242  
        void* stream,
242  
        void* stream,
243  
        void* storage,
243  
        void* storage,
244  
        std::span<mutable_buffer const> buffers);
244  
        std::span<mutable_buffer const> buffers);
245  
    bool (*await_ready)(void*);
245  
    bool (*await_ready)(void*);
246  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
246  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
247  
    io_result<std::size_t> (*await_resume)(void*);
247  
    io_result<std::size_t> (*await_resume)(void*);
248  
    void (*destroy_awaitable)(void*) noexcept;
248  
    void (*destroy_awaitable)(void*) noexcept;
249  
    std::size_t awaitable_size;
249  
    std::size_t awaitable_size;
250  
    std::size_t awaitable_align;
250  
    std::size_t awaitable_align;
251  
    void (*destroy)(void*) noexcept;
251  
    void (*destroy)(void*) noexcept;
252  
};
252  
};
253  

253  

254  
template<ReadStream S>
254  
template<ReadStream S>
255  
struct any_read_stream::vtable_for_impl
255  
struct any_read_stream::vtable_for_impl
256  
{
256  
{
257  
    using Awaitable = decltype(std::declval<S&>().read_some(
257  
    using Awaitable = decltype(std::declval<S&>().read_some(
258  
        std::span<mutable_buffer const>{}));
258  
        std::span<mutable_buffer const>{}));
259  

259  

260  
    static void
260  
    static void
261  
    do_destroy_impl(void* stream) noexcept
261  
    do_destroy_impl(void* stream) noexcept
262  
    {
262  
    {
263  
        static_cast<S*>(stream)->~S();
263  
        static_cast<S*>(stream)->~S();
264  
    }
264  
    }
265  

265  

266  
    static void
266  
    static void
267  
    construct_awaitable_impl(
267  
    construct_awaitable_impl(
268  
        void* stream,
268  
        void* stream,
269  
        void* storage,
269  
        void* storage,
270  
        std::span<mutable_buffer const> buffers)
270  
        std::span<mutable_buffer const> buffers)
271  
    {
271  
    {
272  
        auto& s = *static_cast<S*>(stream);
272  
        auto& s = *static_cast<S*>(stream);
273  
        ::new(storage) Awaitable(s.read_some(buffers));
273  
        ::new(storage) Awaitable(s.read_some(buffers));
274  
    }
274  
    }
275  

275  

276  
    static constexpr vtable value = {
276  
    static constexpr vtable value = {
277  
        &construct_awaitable_impl,
277  
        &construct_awaitable_impl,
278  
        +[](void* p) {
278  
        +[](void* p) {
279  
            return static_cast<Awaitable*>(p)->await_ready();
279  
            return static_cast<Awaitable*>(p)->await_ready();
280  
        },
280  
        },
281  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
281  
        +[](void* p, std::coroutine_handle<> h, io_env const* env) {
282  
            return detail::call_await_suspend(
282  
            return detail::call_await_suspend(
283  
                static_cast<Awaitable*>(p), h, env);
283  
                static_cast<Awaitable*>(p), h, env);
284  
        },
284  
        },
285  
        +[](void* p) {
285  
        +[](void* p) {
286  
            return static_cast<Awaitable*>(p)->await_resume();
286  
            return static_cast<Awaitable*>(p)->await_resume();
287  
        },
287  
        },
288  
        +[](void* p) noexcept {
288  
        +[](void* p) noexcept {
289  
            static_cast<Awaitable*>(p)->~Awaitable();
289  
            static_cast<Awaitable*>(p)->~Awaitable();
290  
        },
290  
        },
291  
        sizeof(Awaitable),
291  
        sizeof(Awaitable),
292  
        alignof(Awaitable),
292  
        alignof(Awaitable),
293  
        &do_destroy_impl
293  
        &do_destroy_impl
294  
    };
294  
    };
295  
};
295  
};
296  

296  

297  
inline
297  
inline
298  
any_read_stream::~any_read_stream()
298  
any_read_stream::~any_read_stream()
299  
{
299  
{
300  
    if(storage_)
300  
    if(storage_)
301  
    {
301  
    {
302  
        vt_->destroy(stream_);
302  
        vt_->destroy(stream_);
303  
        ::operator delete(storage_);
303  
        ::operator delete(storage_);
304  
    }
304  
    }
305  
    if(cached_awaitable_)
305  
    if(cached_awaitable_)
306  
    {
306  
    {
307  
        if(awaitable_active_)
307  
        if(awaitable_active_)
308  
            vt_->destroy_awaitable(cached_awaitable_);
308  
            vt_->destroy_awaitable(cached_awaitable_);
309  
        ::operator delete(cached_awaitable_);
309  
        ::operator delete(cached_awaitable_);
310  
    }
310  
    }
311  
}
311  
}
312  

312  

313  
inline any_read_stream&
313  
inline any_read_stream&
314  
any_read_stream::operator=(any_read_stream&& other) noexcept
314  
any_read_stream::operator=(any_read_stream&& other) noexcept
315  
{
315  
{
316  
    if(this != &other)
316  
    if(this != &other)
317  
    {
317  
    {
318  
        if(storage_)
318  
        if(storage_)
319  
        {
319  
        {
320  
            vt_->destroy(stream_);
320  
            vt_->destroy(stream_);
321  
            ::operator delete(storage_);
321  
            ::operator delete(storage_);
322  
        }
322  
        }
323  
        if(cached_awaitable_)
323  
        if(cached_awaitable_)
324  
        {
324  
        {
325  
            if(awaitable_active_)
325  
            if(awaitable_active_)
326  
                vt_->destroy_awaitable(cached_awaitable_);
326  
                vt_->destroy_awaitable(cached_awaitable_);
327  
            ::operator delete(cached_awaitable_);
327  
            ::operator delete(cached_awaitable_);
328  
        }
328  
        }
329  
        stream_ = std::exchange(other.stream_, nullptr);
329  
        stream_ = std::exchange(other.stream_, nullptr);
330  
        vt_ = std::exchange(other.vt_, nullptr);
330  
        vt_ = std::exchange(other.vt_, nullptr);
331  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
331  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
332  
        storage_ = std::exchange(other.storage_, nullptr);
332  
        storage_ = std::exchange(other.storage_, nullptr);
333  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
333  
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
334  
    }
334  
    }
335  
    return *this;
335  
    return *this;
336  
}
336  
}
337  

337  

338  
template<ReadStream S>
338  
template<ReadStream S>
339  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
339  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
340  
any_read_stream::any_read_stream(S s)
340  
any_read_stream::any_read_stream(S s)
341  
    : vt_(&vtable_for_impl<S>::value)
341  
    : vt_(&vtable_for_impl<S>::value)
342  
{
342  
{
343  
    struct guard {
343  
    struct guard {
344  
        any_read_stream* self;
344  
        any_read_stream* self;
345  
        bool committed = false;
345  
        bool committed = false;
346  
        ~guard() {
346  
        ~guard() {
347  
            if(!committed && self->storage_) {
347  
            if(!committed && self->storage_) {
348  
                self->vt_->destroy(self->stream_);
348  
                self->vt_->destroy(self->stream_);
349  
                ::operator delete(self->storage_);
349  
                ::operator delete(self->storage_);
350  
                self->storage_ = nullptr;
350  
                self->storage_ = nullptr;
351  
                self->stream_ = nullptr;
351  
                self->stream_ = nullptr;
352  
            }
352  
            }
353  
        }
353  
        }
354  
    } g{this};
354  
    } g{this};
355  

355  

356  
    storage_ = ::operator new(sizeof(S));
356  
    storage_ = ::operator new(sizeof(S));
357  
    stream_ = ::new(storage_) S(std::move(s));
357  
    stream_ = ::new(storage_) S(std::move(s));
358  

358  

359  
    // Preallocate the awaitable storage
359  
    // Preallocate the awaitable storage
360  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
360  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
361  

361  

362  
    g.committed = true;
362  
    g.committed = true;
363  
}
363  
}
364  

364  

365  
template<ReadStream S>
365  
template<ReadStream S>
366  
any_read_stream::any_read_stream(S* s)
366  
any_read_stream::any_read_stream(S* s)
367  
    : stream_(s)
367  
    : stream_(s)
368  
    , vt_(&vtable_for_impl<S>::value)
368  
    , vt_(&vtable_for_impl<S>::value)
369  
{
369  
{
370  
    // Preallocate the awaitable storage
370  
    // Preallocate the awaitable storage
371  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
371  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
372  
}
372  
}
373  

373  

374  
template<MutableBufferSequence MB>
374  
template<MutableBufferSequence MB>
375  
auto
375  
auto
376  
any_read_stream::read_some(MB buffers)
376  
any_read_stream::read_some(MB buffers)
377  
{
377  
{
378  
    // VFALCO in theory, we could use if constexpr to detect a
378  
    // VFALCO in theory, we could use if constexpr to detect a
379  
    // span and then pass that through to read_some without the array
379  
    // span and then pass that through to read_some without the array
380  
    struct awaitable
380  
    struct awaitable
381  
    {
381  
    {
382  
        any_read_stream* self_;
382  
        any_read_stream* self_;
383  
        mutable_buffer_array<detail::max_iovec_> ba_;
383  
        mutable_buffer_array<detail::max_iovec_> ba_;
384  

384  

385  
        bool
385  
        bool
386  
        await_ready()
386  
        await_ready()
387  
        {
387  
        {
388  
            self_->vt_->construct_awaitable(
388  
            self_->vt_->construct_awaitable(
389  
                self_->stream_,
389  
                self_->stream_,
390  
                self_->cached_awaitable_,
390  
                self_->cached_awaitable_,
391  
                ba_.to_span());
391  
                ba_.to_span());
392  
            self_->awaitable_active_ = true;
392  
            self_->awaitable_active_ = true;
393  

393  

394  
            return self_->vt_->await_ready(
394  
            return self_->vt_->await_ready(
395  
                self_->cached_awaitable_);
395  
                self_->cached_awaitable_);
396  
        }
396  
        }
397  

397  

398  
        std::coroutine_handle<>
398  
        std::coroutine_handle<>
399  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
399  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
400  
        {
400  
        {
401  
            return self_->vt_->await_suspend(
401  
            return self_->vt_->await_suspend(
402  
                self_->cached_awaitable_, h, env);
402  
                self_->cached_awaitable_, h, env);
403  
        }
403  
        }
404  

404  

405  
        io_result<std::size_t>
405  
        io_result<std::size_t>
406  
        await_resume()
406  
        await_resume()
407  
        {
407  
        {
408  
            struct guard {
408  
            struct guard {
409  
                any_read_stream* self;
409  
                any_read_stream* self;
410  
                ~guard() {
410  
                ~guard() {
411  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
411  
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
412  
                    self->awaitable_active_ = false;
412  
                    self->awaitable_active_ = false;
413  
                }
413  
                }
414  
            } g{self_};
414  
            } g{self_};
415  
            return self_->vt_->await_resume(
415  
            return self_->vt_->await_resume(
416  
                self_->cached_awaitable_);
416  
                self_->cached_awaitable_);
417  
        }
417  
        }
418  
    };
418  
    };
419  
    return awaitable{this,
419  
    return awaitable{this,
420  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
420  
        mutable_buffer_array<detail::max_iovec_>(buffers)};
421  
}
421  
}
422  

422  

423  
} // namespace capy
423  
} // namespace capy
424  
} // namespace boost
424  
} // namespace boost
425  

425  

426  
#endif
426  
#endif