1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <coroutine>
20  
#include <coroutine>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any WriteSink.
38  
/** Type-erased wrapper for any WriteSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    @ref WriteSink concept, enabling runtime polymorphism for
42  
    sink write operations. It uses cached awaitable storage to achieve
42  
    sink write operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper supports two construction modes:
45  
    The wrapper supports two construction modes:
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
47  
      allocates storage and owns the sink.
47  
      allocates storage and owns the sink.
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
49  
      pointed-to sink must outlive this wrapper.
49  
      pointed-to sink must outlive this wrapper.
50  

50  

51  
    @par Awaitable Preallocation
51  
    @par Awaitable Preallocation
52  
    The constructor preallocates storage for the type-erased awaitable.
52  
    The constructor preallocates storage for the type-erased awaitable.
53  
    This reserves all virtual address space at server startup
53  
    This reserves all virtual address space at server startup
54  
    so memory usage can be measured up front, rather than
54  
    so memory usage can be measured up front, rather than
55  
    allocating piecemeal as traffic arrives.
55  
    allocating piecemeal as traffic arrives.
56  

56  

57  
    @par Immediate Completion
57  
    @par Immediate Completion
58  
    Operations complete immediately without suspending when the
58  
    Operations complete immediately without suspending when the
59  
    buffer sequence is empty, or when the underlying sink's
59  
    buffer sequence is empty, or when the underlying sink's
60  
    awaitable reports readiness via `await_ready`.
60  
    awaitable reports readiness via `await_ready`.
61  

61  

62  
    @par Thread Safety
62  
    @par Thread Safety
63  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    Not thread-safe. Concurrent operations on the same wrapper
64  
    are undefined behavior.
64  
    are undefined behavior.
65  

65  

66  
    @par Example
66  
    @par Example
67  
    @code
67  
    @code
68  
    // Owning - takes ownership of the sink
68  
    // Owning - takes ownership of the sink
69  
    any_write_sink ws(some_sink{args...});
69  
    any_write_sink ws(some_sink{args...});
70  

70  

71  
    // Reference - wraps without ownership
71  
    // Reference - wraps without ownership
72  
    some_sink sink;
72  
    some_sink sink;
73  
    any_write_sink ws(&sink);
73  
    any_write_sink ws(&sink);
74  

74  

75  
    const_buffer buf(data, size);
75  
    const_buffer buf(data, size);
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77  
    auto [ec2] = co_await ws.write_eof();
77  
    auto [ec2] = co_await ws.write_eof();
78  
    @endcode
78  
    @endcode
79  

79  

80  
    @see any_write_stream, WriteSink
80  
    @see any_write_stream, WriteSink
81  
*/
81  
*/
82  
class any_write_sink
82  
class any_write_sink
83  
{
83  
{
84  
    struct vtable;
84  
    struct vtable;
85  
    struct write_awaitable_ops;
85  
    struct write_awaitable_ops;
86  
    struct eof_awaitable_ops;
86  
    struct eof_awaitable_ops;
87  

87  

88  
    template<WriteSink S>
88  
    template<WriteSink S>
89  
    struct vtable_for_impl;
89  
    struct vtable_for_impl;
90  

90  

91  
    void* sink_ = nullptr;
91  
    void* sink_ = nullptr;
92  
    vtable const* vt_ = nullptr;
92  
    vtable const* vt_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
94  
    void* storage_ = nullptr;
94  
    void* storage_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
97  

97  

98  
public:
98  
public:
99  
    /** Destructor.
99  
    /** Destructor.
100  

100  

101  
        Destroys the owned sink (if any) and releases the cached
101  
        Destroys the owned sink (if any) and releases the cached
102  
        awaitable storage.
102  
        awaitable storage.
103  
    */
103  
    */
104  
    ~any_write_sink();
104  
    ~any_write_sink();
105  

105  

106  
    /** Construct a default instance.
106  
    /** Construct a default instance.
107  

107  

108  
        Constructs an empty wrapper. Operations on a default-constructed
108  
        Constructs an empty wrapper. Operations on a default-constructed
109  
        wrapper result in undefined behavior.
109  
        wrapper result in undefined behavior.
110  
    */
110  
    */
111  
    any_write_sink() = default;
111  
    any_write_sink() = default;
112  

112  

113  
    /** Non-copyable.
113  
    /** Non-copyable.
114  

114  

115  
        The awaitable cache is per-instance and cannot be shared.
115  
        The awaitable cache is per-instance and cannot be shared.
116  
    */
116  
    */
117  
    any_write_sink(any_write_sink const&) = delete;
117  
    any_write_sink(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
119  

119  

120  
    /** Construct by moving.
120  
    /** Construct by moving.
121  

121  

122  
        Transfers ownership of the wrapped sink (if owned) and
122  
        Transfers ownership of the wrapped sink (if owned) and
123  
        cached awaitable storage from `other`. After the move, `other` is
123  
        cached awaitable storage from `other`. After the move, `other` is
124  
        in a default-constructed state.
124  
        in a default-constructed state.
125  

125  

126  
        @param other The wrapper to move from.
126  
        @param other The wrapper to move from.
127  
    */
127  
    */
128  
    any_write_sink(any_write_sink&& other) noexcept
128  
    any_write_sink(any_write_sink&& other) noexcept
129  
        : sink_(std::exchange(other.sink_, nullptr))
129  
        : sink_(std::exchange(other.sink_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135  
    {
135  
    {
136  
    }
136  
    }
137  

137  

138  
    /** Assign by moving.
138  
    /** Assign by moving.
139  

139  

140  
        Destroys any owned sink and releases existing resources,
140  
        Destroys any owned sink and releases existing resources,
141  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
142  

142  

143  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
145  
    */
145  
    */
146  
    any_write_sink&
146  
    any_write_sink&
147  
    operator=(any_write_sink&& other) noexcept;
147  
    operator=(any_write_sink&& other) noexcept;
148  

148  

149  
    /** Construct by taking ownership of a WriteSink.
149  
    /** Construct by taking ownership of a WriteSink.
150  

150  

151  
        Allocates storage and moves the sink into this wrapper.
151  
        Allocates storage and moves the sink into this wrapper.
152  
        The wrapper owns the sink and will destroy it.
152  
        The wrapper owns the sink and will destroy it.
153  

153  

154  
        @param s The sink to take ownership of.
154  
        @param s The sink to take ownership of.
155  
    */
155  
    */
156  
    template<WriteSink S>
156  
    template<WriteSink S>
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158  
    any_write_sink(S s);
158  
    any_write_sink(S s);
159  

159  

160  
    /** Construct by wrapping a WriteSink without ownership.
160  
    /** Construct by wrapping a WriteSink without ownership.
161  

161  

162  
        Wraps the given sink by pointer. The sink must remain
162  
        Wraps the given sink by pointer. The sink must remain
163  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
164  

164  

165  
        @param s Pointer to the sink to wrap.
165  
        @param s Pointer to the sink to wrap.
166  
    */
166  
    */
167  
    template<WriteSink S>
167  
    template<WriteSink S>
168  
    any_write_sink(S* s);
168  
    any_write_sink(S* s);
169  

169  

170  
    /** Check if the wrapper contains a valid sink.
170  
    /** Check if the wrapper contains a valid sink.
171  

171  

172  
        @return `true` if wrapping a sink, `false` if default-constructed
172  
        @return `true` if wrapping a sink, `false` if default-constructed
173  
            or moved-from.
173  
            or moved-from.
174  
    */
174  
    */
175  
    bool
175  
    bool
176  
    has_value() const noexcept
176  
    has_value() const noexcept
177  
    {
177  
    {
178  
        return sink_ != nullptr;
178  
        return sink_ != nullptr;
179  
    }
179  
    }
180  

180  

181  
    /** Check if the wrapper contains a valid sink.
181  
    /** Check if the wrapper contains a valid sink.
182  

182  

183  
        @return `true` if wrapping a sink, `false` if default-constructed
183  
        @return `true` if wrapping a sink, `false` if default-constructed
184  
            or moved-from.
184  
            or moved-from.
185  
    */
185  
    */
186  
    explicit
186  
    explicit
187  
    operator bool() const noexcept
187  
    operator bool() const noexcept
188  
    {
188  
    {
189  
        return has_value();
189  
        return has_value();
190  
    }
190  
    }
191  

191  

192  
    /** Initiate a partial write operation.
192  
    /** Initiate a partial write operation.
193  

193  

194  
        Writes one or more bytes from the provided buffer sequence.
194  
        Writes one or more bytes from the provided buffer sequence.
195  
        May consume less than the full sequence.
195  
        May consume less than the full sequence.
196  

196  

197  
        @param buffers The buffer sequence containing data to write.
197  
        @param buffers The buffer sequence containing data to write.
198  

198  

199  
        @return An awaitable yielding `(error_code,std::size_t)`.
199  
        @return An awaitable yielding `(error_code,std::size_t)`.
200  

200  

201  
        @par Immediate Completion
201  
        @par Immediate Completion
202  
        The operation completes immediately without suspending
202  
        The operation completes immediately without suspending
203  
        the calling coroutine when:
203  
        the calling coroutine when:
204  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
204  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205  
        @li The underlying sink's awaitable reports immediate
205  
        @li The underlying sink's awaitable reports immediate
206  
            readiness via `await_ready`.
206  
            readiness via `await_ready`.
207  

207  

208  
        @note This is a partial operation and may not process the
208  
        @note This is a partial operation and may not process the
209  
        entire buffer sequence. Use @ref write for guaranteed
209  
        entire buffer sequence. Use @ref write for guaranteed
210  
        complete transfer.
210  
        complete transfer.
211  

211  

212  
        @par Preconditions
212  
        @par Preconditions
213  
        The wrapper must contain a valid sink (`has_value() == true`).
213  
        The wrapper must contain a valid sink (`has_value() == true`).
214  
    */
214  
    */
215  
    template<ConstBufferSequence CB>
215  
    template<ConstBufferSequence CB>
216  
    auto
216  
    auto
217  
    write_some(CB buffers);
217  
    write_some(CB buffers);
218  

218  

219  
    /** Initiate a complete write operation.
219  
    /** Initiate a complete write operation.
220  

220  

221  
        Writes data from the provided buffer sequence. The operation
221  
        Writes data from the provided buffer sequence. The operation
222  
        completes when all bytes have been consumed, or an error
222  
        completes when all bytes have been consumed, or an error
223  
        occurs. Forwards to the underlying sink's `write` operation,
223  
        occurs. Forwards to the underlying sink's `write` operation,
224  
        windowed through @ref buffer_param when the sequence exceeds
224  
        windowed through @ref buffer_param when the sequence exceeds
225  
        the per-call buffer limit.
225  
        the per-call buffer limit.
226  

226  

227  
        @param buffers The buffer sequence containing data to write.
227  
        @param buffers The buffer sequence containing data to write.
228  

228  

229  
        @return An awaitable yielding `(error_code,std::size_t)`.
229  
        @return An awaitable yielding `(error_code,std::size_t)`.
230  

230  

231  
        @par Immediate Completion
231  
        @par Immediate Completion
232  
        The operation completes immediately without suspending
232  
        The operation completes immediately without suspending
233  
        the calling coroutine when:
233  
        the calling coroutine when:
234  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234  
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235  
        @li Every underlying `write` call completes
235  
        @li Every underlying `write` call completes
236  
            immediately (the wrapped sink reports readiness
236  
            immediately (the wrapped sink reports readiness
237  
            via `await_ready` on each iteration).
237  
            via `await_ready` on each iteration).
238  

238  

239  
        @par Preconditions
239  
        @par Preconditions
240  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
    */
241  
    */
242  
    template<ConstBufferSequence CB>
242  
    template<ConstBufferSequence CB>
243  
    io_task<std::size_t>
243  
    io_task<std::size_t>
244  
    write(CB buffers);
244  
    write(CB buffers);
245  

245  

246  
    /** Atomically write data and signal end-of-stream.
246  
    /** Atomically write data and signal end-of-stream.
247  

247  

248  
        Writes all data from the buffer sequence and then signals
248  
        Writes all data from the buffer sequence and then signals
249  
        end-of-stream. The implementation decides how to partition
249  
        end-of-stream. The implementation decides how to partition
250  
        the data across calls to the underlying sink's @ref write
250  
        the data across calls to the underlying sink's @ref write
251  
        and `write_eof`. When the caller's buffer sequence is
251  
        and `write_eof`. When the caller's buffer sequence is
252  
        non-empty, the final call to the underlying sink is always
252  
        non-empty, the final call to the underlying sink is always
253  
        `write_eof` with a non-empty buffer sequence. When the
253  
        `write_eof` with a non-empty buffer sequence. When the
254  
        caller's buffer sequence is empty, only `write_eof()` with
254  
        caller's buffer sequence is empty, only `write_eof()` with
255  
        no data is called.
255  
        no data is called.
256  

256  

257  
        @param buffers The buffer sequence containing data to write.
257  
        @param buffers The buffer sequence containing data to write.
258  

258  

259  
        @return An awaitable yielding `(error_code,std::size_t)`.
259  
        @return An awaitable yielding `(error_code,std::size_t)`.
260  

260  

261  
        @par Immediate Completion
261  
        @par Immediate Completion
262  
        The operation completes immediately without suspending
262  
        The operation completes immediately without suspending
263  
        the calling coroutine when:
263  
        the calling coroutine when:
264  
        @li The buffer sequence is empty. Only the @ref write_eof()
264  
        @li The buffer sequence is empty. Only the @ref write_eof()
265  
            call is performed.
265  
            call is performed.
266  
        @li All underlying operations complete immediately (the
266  
        @li All underlying operations complete immediately (the
267  
            wrapped sink reports readiness via `await_ready`).
267  
            wrapped sink reports readiness via `await_ready`).
268  

268  

269  
        @par Preconditions
269  
        @par Preconditions
270  
        The wrapper must contain a valid sink (`has_value() == true`).
270  
        The wrapper must contain a valid sink (`has_value() == true`).
271  
    */
271  
    */
272  
    template<ConstBufferSequence CB>
272  
    template<ConstBufferSequence CB>
273  
    io_task<std::size_t>
273  
    io_task<std::size_t>
274  
    write_eof(CB buffers);
274  
    write_eof(CB buffers);
275  

275  

276  
    /** Signal end of data.
276  
    /** Signal end of data.
277  

277  

278  
        Indicates that no more data will be written to the sink.
278  
        Indicates that no more data will be written to the sink.
279  
        The operation completes when the sink is finalized, or
279  
        The operation completes when the sink is finalized, or
280  
        an error occurs.
280  
        an error occurs.
281  

281  

282  
        @return An awaitable yielding `(error_code)`.
282  
        @return An awaitable yielding `(error_code)`.
283  

283  

284  
        @par Immediate Completion
284  
        @par Immediate Completion
285  
        The operation completes immediately without suspending
285  
        The operation completes immediately without suspending
286  
        the calling coroutine when the underlying sink's awaitable
286  
        the calling coroutine when the underlying sink's awaitable
287  
        reports immediate readiness via `await_ready`.
287  
        reports immediate readiness via `await_ready`.
288  

288  

289  
        @par Preconditions
289  
        @par Preconditions
290  
        The wrapper must contain a valid sink (`has_value() == true`).
290  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
    */
291  
    */
292  
    auto
292  
    auto
293  
    write_eof();
293  
    write_eof();
294  

294  

295  
protected:
295  
protected:
296  
    /** Rebind to a new sink after move.
296  
    /** Rebind to a new sink after move.
297  

297  

298  
        Updates the internal pointer to reference a new sink object.
298  
        Updates the internal pointer to reference a new sink object.
299  
        Used by owning wrappers after move assignment when the owned
299  
        Used by owning wrappers after move assignment when the owned
300  
        object has moved to a new location.
300  
        object has moved to a new location.
301  

301  

302  
        @param new_sink The new sink to bind to. Must be the same
302  
        @param new_sink The new sink to bind to. Must be the same
303  
            type as the original sink.
303  
            type as the original sink.
304  

304  

305  
        @note Terminates if called with a sink of different type
305  
        @note Terminates if called with a sink of different type
306  
            than the original.
306  
            than the original.
307  
    */
307  
    */
308  
    template<WriteSink S>
308  
    template<WriteSink S>
309  
    void
309  
    void
310  
    rebind(S& new_sink) noexcept
310  
    rebind(S& new_sink) noexcept
311  
    {
311  
    {
312  
        if(vt_ != &vtable_for_impl<S>::value)
312  
        if(vt_ != &vtable_for_impl<S>::value)
313  
            std::terminate();
313  
            std::terminate();
314  
        sink_ = &new_sink;
314  
        sink_ = &new_sink;
315  
    }
315  
    }
316  

316  

317  
private:
317  
private:
318  
    auto
318  
    auto
319  
    write_some_(std::span<const_buffer const> buffers);
319  
    write_some_(std::span<const_buffer const> buffers);
320  

320  

321  
    auto
321  
    auto
322  
    write_(std::span<const_buffer const> buffers);
322  
    write_(std::span<const_buffer const> buffers);
323  

323  

324  
    auto
324  
    auto
325  
    write_eof_buffers_(std::span<const_buffer const> buffers);
325  
    write_eof_buffers_(std::span<const_buffer const> buffers);
326  
};
326  
};
327  

327  

328  
struct any_write_sink::write_awaitable_ops
328  
struct any_write_sink::write_awaitable_ops
329  
{
329  
{
330  
    bool (*await_ready)(void*);
330  
    bool (*await_ready)(void*);
331  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
331  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
332  
    io_result<std::size_t> (*await_resume)(void*);
332  
    io_result<std::size_t> (*await_resume)(void*);
333  
    void (*destroy)(void*) noexcept;
333  
    void (*destroy)(void*) noexcept;
334  
};
334  
};
335  

335  

336  
struct any_write_sink::eof_awaitable_ops
336  
struct any_write_sink::eof_awaitable_ops
337  
{
337  
{
338  
    bool (*await_ready)(void*);
338  
    bool (*await_ready)(void*);
339  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
339  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340  
    io_result<> (*await_resume)(void*);
340  
    io_result<> (*await_resume)(void*);
341  
    void (*destroy)(void*) noexcept;
341  
    void (*destroy)(void*) noexcept;
342  
};
342  
};
343  

343  

344  
struct any_write_sink::vtable
344  
struct any_write_sink::vtable
345  
{
345  
{
346  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
346  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
347  
        void* sink,
347  
        void* sink,
348  
        void* storage,
348  
        void* storage,
349  
        std::span<const_buffer const> buffers);
349  
        std::span<const_buffer const> buffers);
350  
    write_awaitable_ops const* (*construct_write_awaitable)(
350  
    write_awaitable_ops const* (*construct_write_awaitable)(
351  
        void* sink,
351  
        void* sink,
352  
        void* storage,
352  
        void* storage,
353  
        std::span<const_buffer const> buffers);
353  
        std::span<const_buffer const> buffers);
354  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
354  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
355  
        void* sink,
355  
        void* sink,
356  
        void* storage,
356  
        void* storage,
357  
        std::span<const_buffer const> buffers);
357  
        std::span<const_buffer const> buffers);
358  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
358  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
359  
        void* sink,
359  
        void* sink,
360  
        void* storage);
360  
        void* storage);
361  
    std::size_t awaitable_size;
361  
    std::size_t awaitable_size;
362  
    std::size_t awaitable_align;
362  
    std::size_t awaitable_align;
363  
    void (*destroy)(void*) noexcept;
363  
    void (*destroy)(void*) noexcept;
364  
};
364  
};
365  

365  

366  
template<WriteSink S>
366  
template<WriteSink S>
367  
struct any_write_sink::vtable_for_impl
367  
struct any_write_sink::vtable_for_impl
368  
{
368  
{
369  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
369  
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
370  
        std::span<const_buffer const>{}));
370  
        std::span<const_buffer const>{}));
371  
    using WriteAwaitable = decltype(std::declval<S&>().write(
371  
    using WriteAwaitable = decltype(std::declval<S&>().write(
372  
        std::span<const_buffer const>{}));
372  
        std::span<const_buffer const>{}));
373  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
373  
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
374  
        std::span<const_buffer const>{}));
374  
        std::span<const_buffer const>{}));
375  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
375  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
376  

376  

377  
    static void
377  
    static void
378  
    do_destroy_impl(void* sink) noexcept
378  
    do_destroy_impl(void* sink) noexcept
379  
    {
379  
    {
380  
        static_cast<S*>(sink)->~S();
380  
        static_cast<S*>(sink)->~S();
381  
    }
381  
    }
382  

382  

383  
    static write_awaitable_ops const*
383  
    static write_awaitable_ops const*
384  
    construct_write_some_awaitable_impl(
384  
    construct_write_some_awaitable_impl(
385  
        void* sink,
385  
        void* sink,
386  
        void* storage,
386  
        void* storage,
387  
        std::span<const_buffer const> buffers)
387  
        std::span<const_buffer const> buffers)
388  
    {
388  
    {
389  
        auto& s = *static_cast<S*>(sink);
389  
        auto& s = *static_cast<S*>(sink);
390  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
390  
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
391  

391  

392  
        static constexpr write_awaitable_ops ops = {
392  
        static constexpr write_awaitable_ops ops = {
393  
            +[](void* p) {
393  
            +[](void* p) {
394  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
394  
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
395  
            },
395  
            },
396  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
396  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
397  
                return detail::call_await_suspend(
397  
                return detail::call_await_suspend(
398  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
398  
                    static_cast<WriteSomeAwaitable*>(p), h, env);
399  
            },
399  
            },
400  
            +[](void* p) {
400  
            +[](void* p) {
401  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
401  
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
402  
            },
402  
            },
403  
            +[](void* p) noexcept {
403  
            +[](void* p) noexcept {
404  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
404  
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
405  
            }
405  
            }
406  
        };
406  
        };
407  
        return &ops;
407  
        return &ops;
408  
    }
408  
    }
409  

409  

410  
    static write_awaitable_ops const*
410  
    static write_awaitable_ops const*
411  
    construct_write_awaitable_impl(
411  
    construct_write_awaitable_impl(
412  
        void* sink,
412  
        void* sink,
413  
        void* storage,
413  
        void* storage,
414  
        std::span<const_buffer const> buffers)
414  
        std::span<const_buffer const> buffers)
415  
    {
415  
    {
416  
        auto& s = *static_cast<S*>(sink);
416  
        auto& s = *static_cast<S*>(sink);
417  
        ::new(storage) WriteAwaitable(s.write(buffers));
417  
        ::new(storage) WriteAwaitable(s.write(buffers));
418  

418  

419  
        static constexpr write_awaitable_ops ops = {
419  
        static constexpr write_awaitable_ops ops = {
420  
            +[](void* p) {
420  
            +[](void* p) {
421  
                return static_cast<WriteAwaitable*>(p)->await_ready();
421  
                return static_cast<WriteAwaitable*>(p)->await_ready();
422  
            },
422  
            },
423  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
423  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
424  
                return detail::call_await_suspend(
424  
                return detail::call_await_suspend(
425  
                    static_cast<WriteAwaitable*>(p), h, env);
425  
                    static_cast<WriteAwaitable*>(p), h, env);
426  
            },
426  
            },
427  
            +[](void* p) {
427  
            +[](void* p) {
428  
                return static_cast<WriteAwaitable*>(p)->await_resume();
428  
                return static_cast<WriteAwaitable*>(p)->await_resume();
429  
            },
429  
            },
430  
            +[](void* p) noexcept {
430  
            +[](void* p) noexcept {
431  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
431  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
432  
            }
432  
            }
433  
        };
433  
        };
434  
        return &ops;
434  
        return &ops;
435  
    }
435  
    }
436  

436  

437  
    static write_awaitable_ops const*
437  
    static write_awaitable_ops const*
438  
    construct_write_eof_buffers_awaitable_impl(
438  
    construct_write_eof_buffers_awaitable_impl(
439  
        void* sink,
439  
        void* sink,
440  
        void* storage,
440  
        void* storage,
441  
        std::span<const_buffer const> buffers)
441  
        std::span<const_buffer const> buffers)
442  
    {
442  
    {
443  
        auto& s = *static_cast<S*>(sink);
443  
        auto& s = *static_cast<S*>(sink);
444  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
444  
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
445  

445  

446  
        static constexpr write_awaitable_ops ops = {
446  
        static constexpr write_awaitable_ops ops = {
447  
            +[](void* p) {
447  
            +[](void* p) {
448  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
448  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
449  
            },
449  
            },
450  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
450  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
451  
                return detail::call_await_suspend(
451  
                return detail::call_await_suspend(
452  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
452  
                    static_cast<WriteEofBuffersAwaitable*>(p), h, env);
453  
            },
453  
            },
454  
            +[](void* p) {
454  
            +[](void* p) {
455  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
455  
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
456  
            },
456  
            },
457  
            +[](void* p) noexcept {
457  
            +[](void* p) noexcept {
458  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
458  
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
459  
            }
459  
            }
460  
        };
460  
        };
461  
        return &ops;
461  
        return &ops;
462  
    }
462  
    }
463  

463  

464  
    static eof_awaitable_ops const*
464  
    static eof_awaitable_ops const*
465  
    construct_eof_awaitable_impl(
465  
    construct_eof_awaitable_impl(
466  
        void* sink,
466  
        void* sink,
467  
        void* storage)
467  
        void* storage)
468  
    {
468  
    {
469  
        auto& s = *static_cast<S*>(sink);
469  
        auto& s = *static_cast<S*>(sink);
470  
        ::new(storage) EofAwaitable(s.write_eof());
470  
        ::new(storage) EofAwaitable(s.write_eof());
471  

471  

472  
        static constexpr eof_awaitable_ops ops = {
472  
        static constexpr eof_awaitable_ops ops = {
473  
            +[](void* p) {
473  
            +[](void* p) {
474  
                return static_cast<EofAwaitable*>(p)->await_ready();
474  
                return static_cast<EofAwaitable*>(p)->await_ready();
475  
            },
475  
            },
476  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
476  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
477  
                return detail::call_await_suspend(
477  
                return detail::call_await_suspend(
478  
                    static_cast<EofAwaitable*>(p), h, env);
478  
                    static_cast<EofAwaitable*>(p), h, env);
479  
            },
479  
            },
480  
            +[](void* p) {
480  
            +[](void* p) {
481  
                return static_cast<EofAwaitable*>(p)->await_resume();
481  
                return static_cast<EofAwaitable*>(p)->await_resume();
482  
            },
482  
            },
483  
            +[](void* p) noexcept {
483  
            +[](void* p) noexcept {
484  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
484  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
485  
            }
485  
            }
486  
        };
486  
        };
487  
        return &ops;
487  
        return &ops;
488  
    }
488  
    }
489  

489  

490  
    static constexpr std::size_t max4(
490  
    static constexpr std::size_t max4(
491  
        std::size_t a, std::size_t b,
491  
        std::size_t a, std::size_t b,
492  
        std::size_t c, std::size_t d) noexcept
492  
        std::size_t c, std::size_t d) noexcept
493  
    {
493  
    {
494  
        std::size_t ab = a > b ? a : b;
494  
        std::size_t ab = a > b ? a : b;
495  
        std::size_t cd = c > d ? c : d;
495  
        std::size_t cd = c > d ? c : d;
496  
        return ab > cd ? ab : cd;
496  
        return ab > cd ? ab : cd;
497  
    }
497  
    }
498  

498  

499  
    static constexpr std::size_t max_awaitable_size =
499  
    static constexpr std::size_t max_awaitable_size =
500  
        max4(sizeof(WriteSomeAwaitable),
500  
        max4(sizeof(WriteSomeAwaitable),
501  
             sizeof(WriteAwaitable),
501  
             sizeof(WriteAwaitable),
502  
             sizeof(WriteEofBuffersAwaitable),
502  
             sizeof(WriteEofBuffersAwaitable),
503  
             sizeof(EofAwaitable));
503  
             sizeof(EofAwaitable));
504  

504  

505  
    static constexpr std::size_t max_awaitable_align =
505  
    static constexpr std::size_t max_awaitable_align =
506  
        max4(alignof(WriteSomeAwaitable),
506  
        max4(alignof(WriteSomeAwaitable),
507  
             alignof(WriteAwaitable),
507  
             alignof(WriteAwaitable),
508  
             alignof(WriteEofBuffersAwaitable),
508  
             alignof(WriteEofBuffersAwaitable),
509  
             alignof(EofAwaitable));
509  
             alignof(EofAwaitable));
510  

510  

511  
    static constexpr vtable value = {
511  
    static constexpr vtable value = {
512  
        &construct_write_some_awaitable_impl,
512  
        &construct_write_some_awaitable_impl,
513  
        &construct_write_awaitable_impl,
513  
        &construct_write_awaitable_impl,
514  
        &construct_write_eof_buffers_awaitable_impl,
514  
        &construct_write_eof_buffers_awaitable_impl,
515  
        &construct_eof_awaitable_impl,
515  
        &construct_eof_awaitable_impl,
516  
        max_awaitable_size,
516  
        max_awaitable_size,
517  
        max_awaitable_align,
517  
        max_awaitable_align,
518  
        &do_destroy_impl
518  
        &do_destroy_impl
519  
    };
519  
    };
520  
};
520  
};
521  

521  

522  
inline
522  
inline
523  
any_write_sink::~any_write_sink()
523  
any_write_sink::~any_write_sink()
524  
{
524  
{
525  
    if(storage_)
525  
    if(storage_)
526  
    {
526  
    {
527  
        vt_->destroy(sink_);
527  
        vt_->destroy(sink_);
528  
        ::operator delete(storage_);
528  
        ::operator delete(storage_);
529  
    }
529  
    }
530  
    if(cached_awaitable_)
530  
    if(cached_awaitable_)
531  
    {
531  
    {
532  
        if(active_write_ops_)
532  
        if(active_write_ops_)
533  
            active_write_ops_->destroy(cached_awaitable_);
533  
            active_write_ops_->destroy(cached_awaitable_);
534  
        else if(active_eof_ops_)
534  
        else if(active_eof_ops_)
535  
            active_eof_ops_->destroy(cached_awaitable_);
535  
            active_eof_ops_->destroy(cached_awaitable_);
536  
        ::operator delete(cached_awaitable_);
536  
        ::operator delete(cached_awaitable_);
537  
    }
537  
    }
538  
}
538  
}
539  

539  

540  
inline any_write_sink&
540  
inline any_write_sink&
541  
any_write_sink::operator=(any_write_sink&& other) noexcept
541  
any_write_sink::operator=(any_write_sink&& other) noexcept
542  
{
542  
{
543  
    if(this != &other)
543  
    if(this != &other)
544  
    {
544  
    {
545  
        if(storage_)
545  
        if(storage_)
546  
        {
546  
        {
547  
            vt_->destroy(sink_);
547  
            vt_->destroy(sink_);
548  
            ::operator delete(storage_);
548  
            ::operator delete(storage_);
549  
        }
549  
        }
550  
        if(cached_awaitable_)
550  
        if(cached_awaitable_)
551  
        {
551  
        {
552  
            if(active_write_ops_)
552  
            if(active_write_ops_)
553  
                active_write_ops_->destroy(cached_awaitable_);
553  
                active_write_ops_->destroy(cached_awaitable_);
554  
            else if(active_eof_ops_)
554  
            else if(active_eof_ops_)
555  
                active_eof_ops_->destroy(cached_awaitable_);
555  
                active_eof_ops_->destroy(cached_awaitable_);
556  
            ::operator delete(cached_awaitable_);
556  
            ::operator delete(cached_awaitable_);
557  
        }
557  
        }
558  
        sink_ = std::exchange(other.sink_, nullptr);
558  
        sink_ = std::exchange(other.sink_, nullptr);
559  
        vt_ = std::exchange(other.vt_, nullptr);
559  
        vt_ = std::exchange(other.vt_, nullptr);
560  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
560  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
561  
        storage_ = std::exchange(other.storage_, nullptr);
561  
        storage_ = std::exchange(other.storage_, nullptr);
562  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
562  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
563  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
563  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
564  
    }
564  
    }
565  
    return *this;
565  
    return *this;
566  
}
566  
}
567  

567  

568  
template<WriteSink S>
568  
template<WriteSink S>
569  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
569  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
570  
any_write_sink::any_write_sink(S s)
570  
any_write_sink::any_write_sink(S s)
571  
    : vt_(&vtable_for_impl<S>::value)
571  
    : vt_(&vtable_for_impl<S>::value)
572  
{
572  
{
573  
    struct guard {
573  
    struct guard {
574  
        any_write_sink* self;
574  
        any_write_sink* self;
575  
        bool committed = false;
575  
        bool committed = false;
576  
        ~guard() {
576  
        ~guard() {
577  
            if(!committed && self->storage_) {
577  
            if(!committed && self->storage_) {
578  
                self->vt_->destroy(self->sink_);
578  
                self->vt_->destroy(self->sink_);
579  
                ::operator delete(self->storage_);
579  
                ::operator delete(self->storage_);
580  
                self->storage_ = nullptr;
580  
                self->storage_ = nullptr;
581  
                self->sink_ = nullptr;
581  
                self->sink_ = nullptr;
582  
            }
582  
            }
583  
        }
583  
        }
584  
    } g{this};
584  
    } g{this};
585  

585  

586  
    storage_ = ::operator new(sizeof(S));
586  
    storage_ = ::operator new(sizeof(S));
587  
    sink_ = ::new(storage_) S(std::move(s));
587  
    sink_ = ::new(storage_) S(std::move(s));
588  

588  

589  
    // Preallocate the awaitable storage (sized for max of write/eof)
589  
    // Preallocate the awaitable storage (sized for max of write/eof)
590  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
590  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
591  

591  

592  
    g.committed = true;
592  
    g.committed = true;
593  
}
593  
}
594  

594  

595  
template<WriteSink S>
595  
template<WriteSink S>
596  
any_write_sink::any_write_sink(S* s)
596  
any_write_sink::any_write_sink(S* s)
597  
    : sink_(s)
597  
    : sink_(s)
598  
    , vt_(&vtable_for_impl<S>::value)
598  
    , vt_(&vtable_for_impl<S>::value)
599  
{
599  
{
600  
    // Preallocate the awaitable storage (sized for max of write/eof)
600  
    // Preallocate the awaitable storage (sized for max of write/eof)
601  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
601  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
602  
}
602  
}
603  

603  

604  
inline auto
604  
inline auto
605  
any_write_sink::write_some_(
605  
any_write_sink::write_some_(
606  
    std::span<const_buffer const> buffers)
606  
    std::span<const_buffer const> buffers)
607  
{
607  
{
608  
    struct awaitable
608  
    struct awaitable
609  
    {
609  
    {
610  
        any_write_sink* self_;
610  
        any_write_sink* self_;
611  
        std::span<const_buffer const> buffers_;
611  
        std::span<const_buffer const> buffers_;
612  

612  

613  
        bool
613  
        bool
614  
        await_ready() const noexcept
614  
        await_ready() const noexcept
615  
        {
615  
        {
616  
            return false;
616  
            return false;
617  
        }
617  
        }
618  

618  

619  
        std::coroutine_handle<>
619  
        std::coroutine_handle<>
620  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
620  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
621  
        {
621  
        {
622  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
622  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
623  
                self_->sink_,
623  
                self_->sink_,
624  
                self_->cached_awaitable_,
624  
                self_->cached_awaitable_,
625  
                buffers_);
625  
                buffers_);
626  

626  

627  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
627  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
628  
                return h;
628  
                return h;
629  

629  

630  
            return self_->active_write_ops_->await_suspend(
630  
            return self_->active_write_ops_->await_suspend(
631  
                self_->cached_awaitable_, h, env);
631  
                self_->cached_awaitable_, h, env);
632  
        }
632  
        }
633  

633  

634  
        io_result<std::size_t>
634  
        io_result<std::size_t>
635  
        await_resume()
635  
        await_resume()
636  
        {
636  
        {
637  
            struct guard {
637  
            struct guard {
638  
                any_write_sink* self;
638  
                any_write_sink* self;
639  
                ~guard() {
639  
                ~guard() {
640  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
640  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
641  
                    self->active_write_ops_ = nullptr;
641  
                    self->active_write_ops_ = nullptr;
642  
                }
642  
                }
643  
            } g{self_};
643  
            } g{self_};
644  
            return self_->active_write_ops_->await_resume(
644  
            return self_->active_write_ops_->await_resume(
645  
                self_->cached_awaitable_);
645  
                self_->cached_awaitable_);
646  
        }
646  
        }
647  
    };
647  
    };
648  
    return awaitable{this, buffers};
648  
    return awaitable{this, buffers};
649  
}
649  
}
650  

650  

651  
inline auto
651  
inline auto
652  
any_write_sink::write_(
652  
any_write_sink::write_(
653  
    std::span<const_buffer const> buffers)
653  
    std::span<const_buffer const> buffers)
654  
{
654  
{
655  
    struct awaitable
655  
    struct awaitable
656  
    {
656  
    {
657  
        any_write_sink* self_;
657  
        any_write_sink* self_;
658  
        std::span<const_buffer const> buffers_;
658  
        std::span<const_buffer const> buffers_;
659  

659  

660  
        bool
660  
        bool
661  
        await_ready() const noexcept
661  
        await_ready() const noexcept
662  
        {
662  
        {
663  
            return false;
663  
            return false;
664  
        }
664  
        }
665  

665  

666  
        std::coroutine_handle<>
666  
        std::coroutine_handle<>
667  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
667  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
668  
        {
668  
        {
669  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
669  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
670  
                self_->sink_,
670  
                self_->sink_,
671  
                self_->cached_awaitable_,
671  
                self_->cached_awaitable_,
672  
                buffers_);
672  
                buffers_);
673  

673  

674  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
674  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
675  
                return h;
675  
                return h;
676  

676  

677  
            return self_->active_write_ops_->await_suspend(
677  
            return self_->active_write_ops_->await_suspend(
678  
                self_->cached_awaitable_, h, env);
678  
                self_->cached_awaitable_, h, env);
679  
        }
679  
        }
680  

680  

681  
        io_result<std::size_t>
681  
        io_result<std::size_t>
682  
        await_resume()
682  
        await_resume()
683  
        {
683  
        {
684  
            struct guard {
684  
            struct guard {
685  
                any_write_sink* self;
685  
                any_write_sink* self;
686  
                ~guard() {
686  
                ~guard() {
687  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
687  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
688  
                    self->active_write_ops_ = nullptr;
688  
                    self->active_write_ops_ = nullptr;
689  
                }
689  
                }
690  
            } g{self_};
690  
            } g{self_};
691  
            return self_->active_write_ops_->await_resume(
691  
            return self_->active_write_ops_->await_resume(
692  
                self_->cached_awaitable_);
692  
                self_->cached_awaitable_);
693  
        }
693  
        }
694  
    };
694  
    };
695  
    return awaitable{this, buffers};
695  
    return awaitable{this, buffers};
696  
}
696  
}
697  

697  

698  
inline auto
698  
inline auto
699  
any_write_sink::write_eof()
699  
any_write_sink::write_eof()
700  
{
700  
{
701  
    struct awaitable
701  
    struct awaitable
702  
    {
702  
    {
703  
        any_write_sink* self_;
703  
        any_write_sink* self_;
704  

704  

705  
        bool
705  
        bool
706  
        await_ready() const noexcept
706  
        await_ready() const noexcept
707  
        {
707  
        {
708  
            return false;
708  
            return false;
709  
        }
709  
        }
710  

710  

711  
        std::coroutine_handle<>
711  
        std::coroutine_handle<>
712  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
712  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
713  
        {
713  
        {
714  
            // Construct the underlying awaitable into cached storage
714  
            // Construct the underlying awaitable into cached storage
715  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
715  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
716  
                self_->sink_,
716  
                self_->sink_,
717  
                self_->cached_awaitable_);
717  
                self_->cached_awaitable_);
718  

718  

719  
            // Check if underlying is immediately ready
719  
            // Check if underlying is immediately ready
720  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
720  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
721  
                return h;
721  
                return h;
722  

722  

723  
            // Forward to underlying awaitable
723  
            // Forward to underlying awaitable
724  
            return self_->active_eof_ops_->await_suspend(
724  
            return self_->active_eof_ops_->await_suspend(
725  
                self_->cached_awaitable_, h, env);
725  
                self_->cached_awaitable_, h, env);
726  
        }
726  
        }
727  

727  

728  
        io_result<>
728  
        io_result<>
729  
        await_resume()
729  
        await_resume()
730  
        {
730  
        {
731  
            struct guard {
731  
            struct guard {
732  
                any_write_sink* self;
732  
                any_write_sink* self;
733  
                ~guard() {
733  
                ~guard() {
734  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
734  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
735  
                    self->active_eof_ops_ = nullptr;
735  
                    self->active_eof_ops_ = nullptr;
736  
                }
736  
                }
737  
            } g{self_};
737  
            } g{self_};
738  
            return self_->active_eof_ops_->await_resume(
738  
            return self_->active_eof_ops_->await_resume(
739  
                self_->cached_awaitable_);
739  
                self_->cached_awaitable_);
740  
        }
740  
        }
741  
    };
741  
    };
742  
    return awaitable{this};
742  
    return awaitable{this};
743  
}
743  
}
744  

744  

745  
inline auto
745  
inline auto
746  
any_write_sink::write_eof_buffers_(
746  
any_write_sink::write_eof_buffers_(
747  
    std::span<const_buffer const> buffers)
747  
    std::span<const_buffer const> buffers)
748  
{
748  
{
749  
    struct awaitable
749  
    struct awaitable
750  
    {
750  
    {
751  
        any_write_sink* self_;
751  
        any_write_sink* self_;
752  
        std::span<const_buffer const> buffers_;
752  
        std::span<const_buffer const> buffers_;
753  

753  

754  
        bool
754  
        bool
755  
        await_ready() const noexcept
755  
        await_ready() const noexcept
756  
        {
756  
        {
757  
            return false;
757  
            return false;
758  
        }
758  
        }
759  

759  

760  
        std::coroutine_handle<>
760  
        std::coroutine_handle<>
761  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
761  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
762  
        {
762  
        {
763  
            self_->active_write_ops_ =
763  
            self_->active_write_ops_ =
764  
                self_->vt_->construct_write_eof_buffers_awaitable(
764  
                self_->vt_->construct_write_eof_buffers_awaitable(
765  
                    self_->sink_,
765  
                    self_->sink_,
766  
                    self_->cached_awaitable_,
766  
                    self_->cached_awaitable_,
767  
                    buffers_);
767  
                    buffers_);
768  

768  

769  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
769  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
770  
                return h;
770  
                return h;
771  

771  

772  
            return self_->active_write_ops_->await_suspend(
772  
            return self_->active_write_ops_->await_suspend(
773  
                self_->cached_awaitable_, h, env);
773  
                self_->cached_awaitable_, h, env);
774  
        }
774  
        }
775  

775  

776  
        io_result<std::size_t>
776  
        io_result<std::size_t>
777  
        await_resume()
777  
        await_resume()
778  
        {
778  
        {
779  
            struct guard {
779  
            struct guard {
780  
                any_write_sink* self;
780  
                any_write_sink* self;
781  
                ~guard() {
781  
                ~guard() {
782  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
782  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
783  
                    self->active_write_ops_ = nullptr;
783  
                    self->active_write_ops_ = nullptr;
784  
                }
784  
                }
785  
            } g{self_};
785  
            } g{self_};
786  
            return self_->active_write_ops_->await_resume(
786  
            return self_->active_write_ops_->await_resume(
787  
                self_->cached_awaitable_);
787  
                self_->cached_awaitable_);
788  
        }
788  
        }
789  
    };
789  
    };
790  
    return awaitable{this, buffers};
790  
    return awaitable{this, buffers};
791  
}
791  
}
792  

792  

793  
template<ConstBufferSequence CB>
793  
template<ConstBufferSequence CB>
794  
auto
794  
auto
795  
any_write_sink::write_some(CB buffers)
795  
any_write_sink::write_some(CB buffers)
796  
{
796  
{
797  
    struct awaitable
797  
    struct awaitable
798  
    {
798  
    {
799  
        any_write_sink* self_;
799  
        any_write_sink* self_;
800  
        const_buffer_array<detail::max_iovec_> ba_;
800  
        const_buffer_array<detail::max_iovec_> ba_;
801  

801  

802  
        awaitable(
802  
        awaitable(
803  
            any_write_sink* self,
803  
            any_write_sink* self,
804  
            CB const& buffers)
804  
            CB const& buffers)
805  
            : self_(self)
805  
            : self_(self)
806  
            , ba_(buffers)
806  
            , ba_(buffers)
807  
        {
807  
        {
808  
        }
808  
        }
809  

809  

810  
        bool
810  
        bool
811  
        await_ready() const noexcept
811  
        await_ready() const noexcept
812  
        {
812  
        {
813  
            return ba_.to_span().empty();
813  
            return ba_.to_span().empty();
814  
        }
814  
        }
815  

815  

816  
        std::coroutine_handle<>
816  
        std::coroutine_handle<>
817  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
817  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
818  
        {
818  
        {
819  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
819  
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
820  
                self_->sink_,
820  
                self_->sink_,
821  
                self_->cached_awaitable_,
821  
                self_->cached_awaitable_,
822  
                ba_.to_span());
822  
                ba_.to_span());
823  

823  

824  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
824  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
825  
                return h;
825  
                return h;
826  

826  

827  
            return self_->active_write_ops_->await_suspend(
827  
            return self_->active_write_ops_->await_suspend(
828  
                self_->cached_awaitable_, h, env);
828  
                self_->cached_awaitable_, h, env);
829  
        }
829  
        }
830  

830  

831  
        io_result<std::size_t>
831  
        io_result<std::size_t>
832  
        await_resume()
832  
        await_resume()
833  
        {
833  
        {
834  
            if(ba_.to_span().empty())
834  
            if(ba_.to_span().empty())
835  
                return {{}, 0};
835  
                return {{}, 0};
836  

836  

837  
            struct guard {
837  
            struct guard {
838  
                any_write_sink* self;
838  
                any_write_sink* self;
839  
                ~guard() {
839  
                ~guard() {
840  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
840  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
841  
                    self->active_write_ops_ = nullptr;
841  
                    self->active_write_ops_ = nullptr;
842  
                }
842  
                }
843  
            } g{self_};
843  
            } g{self_};
844  
            return self_->active_write_ops_->await_resume(
844  
            return self_->active_write_ops_->await_resume(
845  
                self_->cached_awaitable_);
845  
                self_->cached_awaitable_);
846  
        }
846  
        }
847  
    };
847  
    };
848  
    return awaitable{this, buffers};
848  
    return awaitable{this, buffers};
849  
}
849  
}
850  

850  

851  
template<ConstBufferSequence CB>
851  
template<ConstBufferSequence CB>
852  
io_task<std::size_t>
852  
io_task<std::size_t>
853  
any_write_sink::write(CB buffers)
853  
any_write_sink::write(CB buffers)
854  
{
854  
{
855  
    buffer_param<CB> bp(buffers);
855  
    buffer_param<CB> bp(buffers);
856  
    std::size_t total = 0;
856  
    std::size_t total = 0;
857  

857  

858  
    for(;;)
858  
    for(;;)
859  
    {
859  
    {
860  
        auto bufs = bp.data();
860  
        auto bufs = bp.data();
861  
        if(bufs.empty())
861  
        if(bufs.empty())
862  
            break;
862  
            break;
863  

863  

864  
        auto [ec, n] = co_await write_(bufs);
864  
        auto [ec, n] = co_await write_(bufs);
865  
        total += n;
865  
        total += n;
866  
        if(ec)
866  
        if(ec)
867  
            co_return {ec, total};
867  
            co_return {ec, total};
868  
        bp.consume(n);
868  
        bp.consume(n);
869  
    }
869  
    }
870  

870  

871  
    co_return {{}, total};
871  
    co_return {{}, total};
872  
}
872  
}
873  

873  

874  
template<ConstBufferSequence CB>
874  
template<ConstBufferSequence CB>
875  
io_task<std::size_t>
875  
io_task<std::size_t>
876  
any_write_sink::write_eof(CB buffers)
876  
any_write_sink::write_eof(CB buffers)
877  
{
877  
{
878  
    const_buffer_param<CB> bp(buffers);
878  
    const_buffer_param<CB> bp(buffers);
879  
    std::size_t total = 0;
879  
    std::size_t total = 0;
880  

880  

881  
    for(;;)
881  
    for(;;)
882  
    {
882  
    {
883  
        auto bufs = bp.data();
883  
        auto bufs = bp.data();
884  
        if(bufs.empty())
884  
        if(bufs.empty())
885  
        {
885  
        {
886  
            auto [ec] = co_await write_eof();
886  
            auto [ec] = co_await write_eof();
887  
            co_return {ec, total};
887  
            co_return {ec, total};
888  
        }
888  
        }
889  

889  

890  
        if(! bp.more())
890  
        if(! bp.more())
891  
        {
891  
        {
892  
            // Last window — send atomically with EOF
892  
            // Last window — send atomically with EOF
893  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
893  
            auto [ec, n] = co_await write_eof_buffers_(bufs);
894  
            total += n;
894  
            total += n;
895  
            co_return {ec, total};
895  
            co_return {ec, total};
896  
        }
896  
        }
897  

897  

898  
        auto [ec, n] = co_await write_(bufs);
898  
        auto [ec, n] = co_await write_(bufs);
899  
        total += n;
899  
        total += n;
900  
        if(ec)
900  
        if(ec)
901  
            co_return {ec, total};
901  
            co_return {ec, total};
902  
        bp.consume(n);
902  
        bp.consume(n);
903  
    }
903  
    }
904  
}
904  
}
905  

905  

906  
} // namespace capy
906  
} // namespace capy
907  
} // namespace boost
907  
} // namespace boost
908  

908  

909  
#endif
909  
#endif