1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
22  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_task.hpp>
23  
#include <boost/capy/io_task.hpp>
24  

24  

25  
#include <concepts>
25  
#include <concepts>
26  
#include <coroutine>
26  
#include <coroutine>
27  
#include <cstddef>
27  
#include <cstddef>
28  
#include <exception>
28  
#include <exception>
29  
#include <new>
29  
#include <new>
30  
#include <span>
30  
#include <span>
31  
#include <stop_token>
31  
#include <stop_token>
32  
#include <system_error>
32  
#include <system_error>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
namespace boost {
35  
namespace boost {
36  
namespace capy {
36  
namespace capy {
37  

37  

38  
/** Type-erased wrapper for any BufferSink.
38  
/** Type-erased wrapper for any BufferSink.
39  

39  

40  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
41  
    @ref BufferSink concept, enabling runtime polymorphism for
41  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    buffer sink operations. It uses cached awaitable storage to achieve
42  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
44  

44  

45  
    The wrapper exposes two interfaces for producing data:
45  
    The wrapper exposes two interfaces for producing data:
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
46  
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47  
    and the @ref WriteSink interface (`write_some`, `write`,
47  
    and the @ref WriteSink interface (`write_some`, `write`,
48  
    `write_eof`). Choose the interface that matches how your data
48  
    `write_eof`). Choose the interface that matches how your data
49  
    is produced:
49  
    is produced:
50  

50  

51  
    @par Choosing an Interface
51  
    @par Choosing an Interface
52  

52  

53  
    Use the **BufferSink** interface when you are a generator that
53  
    Use the **BufferSink** interface when you are a generator that
54  
    produces data into externally-provided buffers. The sink owns
54  
    produces data into externally-provided buffers. The sink owns
55  
    the memory; you call @ref prepare to obtain writable buffers,
55  
    the memory; you call @ref prepare to obtain writable buffers,
56  
    fill them, then call @ref commit or @ref commit_eof.
56  
    fill them, then call @ref commit or @ref commit_eof.
57  

57  

58  
    Use the **WriteSink** interface when you already have buffers
58  
    Use the **WriteSink** interface when you already have buffers
59  
    containing the data to write:
59  
    containing the data to write:
60  
    - If the entire body is available up front, call
60  
    - If the entire body is available up front, call
61  
      @ref write_eof(buffers) to send everything atomically.
61  
      @ref write_eof(buffers) to send everything atomically.
62  
    - If data arrives incrementally, call @ref write or
62  
    - If data arrives incrementally, call @ref write or
63  
      @ref write_some in a loop, then @ref write_eof() when done.
63  
      @ref write_some in a loop, then @ref write_eof() when done.
64  
      Prefer `write` (complete) unless your streaming pattern
64  
      Prefer `write` (complete) unless your streaming pattern
65  
      benefits from partial writes via `write_some`.
65  
      benefits from partial writes via `write_some`.
66  

66  

67  
    If the wrapped type only satisfies @ref BufferSink, the
67  
    If the wrapped type only satisfies @ref BufferSink, the
68  
    @ref WriteSink operations are provided automatically.
68  
    @ref WriteSink operations are provided automatically.
69  

69  

70  
    @par Construction Modes
70  
    @par Construction Modes
71  

71  

72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
72  
    - **Owning**: Pass by value to transfer ownership. The wrapper
73  
      allocates storage and owns the sink.
73  
      allocates storage and owns the sink.
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
74  
    - **Reference**: Pass a pointer to wrap without ownership. The
75  
      pointed-to sink must outlive this wrapper.
75  
      pointed-to sink must outlive this wrapper.
76  

76  

77  
    @par Awaitable Preallocation
77  
    @par Awaitable Preallocation
78  
    The constructor preallocates storage for the type-erased awaitable.
78  
    The constructor preallocates storage for the type-erased awaitable.
79  
    This reserves all virtual address space at server startup
79  
    This reserves all virtual address space at server startup
80  
    so memory usage can be measured up front, rather than
80  
    so memory usage can be measured up front, rather than
81  
    allocating piecemeal as traffic arrives.
81  
    allocating piecemeal as traffic arrives.
82  

82  

83  
    @par Thread Safety
83  
    @par Thread Safety
84  
    Not thread-safe. Concurrent operations on the same wrapper
84  
    Not thread-safe. Concurrent operations on the same wrapper
85  
    are undefined behavior.
85  
    are undefined behavior.
86  

86  

87  
    @par Example
87  
    @par Example
88  
    @code
88  
    @code
89  
    // Owning - takes ownership of the sink
89  
    // Owning - takes ownership of the sink
90  
    any_buffer_sink abs(some_buffer_sink{args...});
90  
    any_buffer_sink abs(some_buffer_sink{args...});
91  

91  

92  
    // Reference - wraps without ownership
92  
    // Reference - wraps without ownership
93  
    some_buffer_sink sink;
93  
    some_buffer_sink sink;
94  
    any_buffer_sink abs(&sink);
94  
    any_buffer_sink abs(&sink);
95  

95  

96  
    // BufferSink interface: generate into callee-owned buffers
96  
    // BufferSink interface: generate into callee-owned buffers
97  
    mutable_buffer arr[16];
97  
    mutable_buffer arr[16];
98  
    auto bufs = abs.prepare(arr);
98  
    auto bufs = abs.prepare(arr);
99  
    // Write data into bufs[0..bufs.size())
99  
    // Write data into bufs[0..bufs.size())
100  
    auto [ec] = co_await abs.commit(bytes_written);
100  
    auto [ec] = co_await abs.commit(bytes_written);
101  
    auto [ec2] = co_await abs.commit_eof(0);
101  
    auto [ec2] = co_await abs.commit_eof(0);
102  

102  

103  
    // WriteSink interface: send caller-owned buffers
103  
    // WriteSink interface: send caller-owned buffers
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
104  
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105  
    auto [ec4] = co_await abs.write_eof();
105  
    auto [ec4] = co_await abs.write_eof();
106  

106  

107  
    // Or send everything at once
107  
    // Or send everything at once
108  
    auto [ec5, n2] = co_await abs.write_eof(
108  
    auto [ec5, n2] = co_await abs.write_eof(
109  
        make_buffer(body_data));
109  
        make_buffer(body_data));
110  
    @endcode
110  
    @endcode
111  

111  

112  
    @see any_buffer_source, BufferSink, WriteSink
112  
    @see any_buffer_source, BufferSink, WriteSink
113  
*/
113  
*/
114  
class any_buffer_sink
114  
class any_buffer_sink
115  
{
115  
{
116  
    struct vtable;
116  
    struct vtable;
117  
    struct awaitable_ops;
117  
    struct awaitable_ops;
118  
    struct write_awaitable_ops;
118  
    struct write_awaitable_ops;
119  

119  

120  
    template<BufferSink S>
120  
    template<BufferSink S>
121  
    struct vtable_for_impl;
121  
    struct vtable_for_impl;
122  

122  

123  
    // hot-path members first for cache locality
123  
    // hot-path members first for cache locality
124  
    void* sink_ = nullptr;
124  
    void* sink_ = nullptr;
125  
    vtable const* vt_ = nullptr;
125  
    vtable const* vt_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
126  
    void* cached_awaitable_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
127  
    awaitable_ops const* active_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
128  
    write_awaitable_ops const* active_write_ops_ = nullptr;
129  
    void* storage_ = nullptr;
129  
    void* storage_ = nullptr;
130  

130  

131  
public:
131  
public:
132  
    /** Destructor.
132  
    /** Destructor.
133  

133  

134  
        Destroys the owned sink (if any) and releases the cached
134  
        Destroys the owned sink (if any) and releases the cached
135  
        awaitable storage.
135  
        awaitable storage.
136  
    */
136  
    */
137  
    ~any_buffer_sink();
137  
    ~any_buffer_sink();
138  

138  

139  
    /** Construct a default instance.
139  
    /** Construct a default instance.
140  

140  

141  
        Constructs an empty wrapper. Operations on a default-constructed
141  
        Constructs an empty wrapper. Operations on a default-constructed
142  
        wrapper result in undefined behavior.
142  
        wrapper result in undefined behavior.
143  
    */
143  
    */
144  
    any_buffer_sink() = default;
144  
    any_buffer_sink() = default;
145  

145  

146  
    /** Non-copyable.
146  
    /** Non-copyable.
147  

147  

148  
        The awaitable cache is per-instance and cannot be shared.
148  
        The awaitable cache is per-instance and cannot be shared.
149  
    */
149  
    */
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
150  
    any_buffer_sink(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
151  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152  

152  

153  
    /** Construct by moving.
153  
    /** Construct by moving.
154  

154  

155  
        Transfers ownership of the wrapped sink (if owned) and
155  
        Transfers ownership of the wrapped sink (if owned) and
156  
        cached awaitable storage from `other`. After the move, `other` is
156  
        cached awaitable storage from `other`. After the move, `other` is
157  
        in a default-constructed state.
157  
        in a default-constructed state.
158  

158  

159  
        @param other The wrapper to move from.
159  
        @param other The wrapper to move from.
160  
    */
160  
    */
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
161  
    any_buffer_sink(any_buffer_sink&& other) noexcept
162  
        : sink_(std::exchange(other.sink_, nullptr))
162  
        : sink_(std::exchange(other.sink_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
163  
        , vt_(std::exchange(other.vt_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
164  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
165  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
166  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
167  
        , storage_(std::exchange(other.storage_, nullptr))
168  
    {
168  
    {
169  
    }
169  
    }
170  

170  

171  
    /** Assign by moving.
171  
    /** Assign by moving.
172  

172  

173  
        Destroys any owned sink and releases existing resources,
173  
        Destroys any owned sink and releases existing resources,
174  
        then transfers ownership from `other`.
174  
        then transfers ownership from `other`.
175  

175  

176  
        @param other The wrapper to move from.
176  
        @param other The wrapper to move from.
177  
        @return Reference to this wrapper.
177  
        @return Reference to this wrapper.
178  
    */
178  
    */
179  
    any_buffer_sink&
179  
    any_buffer_sink&
180  
    operator=(any_buffer_sink&& other) noexcept;
180  
    operator=(any_buffer_sink&& other) noexcept;
181  

181  

182  
    /** Construct by taking ownership of a BufferSink.
182  
    /** Construct by taking ownership of a BufferSink.
183  

183  

184  
        Allocates storage and moves the sink into this wrapper.
184  
        Allocates storage and moves the sink into this wrapper.
185  
        The wrapper owns the sink and will destroy it. If `S` also
185  
        The wrapper owns the sink and will destroy it. If `S` also
186  
        satisfies @ref WriteSink, native write operations are
186  
        satisfies @ref WriteSink, native write operations are
187  
        forwarded through the virtual boundary.
187  
        forwarded through the virtual boundary.
188  

188  

189  
        @param s The sink to take ownership of.
189  
        @param s The sink to take ownership of.
190  
    */
190  
    */
191  
    template<BufferSink S>
191  
    template<BufferSink S>
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
192  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193  
    any_buffer_sink(S s);
193  
    any_buffer_sink(S s);
194  

194  

195  
    /** Construct by wrapping a BufferSink without ownership.
195  
    /** Construct by wrapping a BufferSink without ownership.
196  

196  

197  
        Wraps the given sink by pointer. The sink must remain
197  
        Wraps the given sink by pointer. The sink must remain
198  
        valid for the lifetime of this wrapper. If `S` also
198  
        valid for the lifetime of this wrapper. If `S` also
199  
        satisfies @ref WriteSink, native write operations are
199  
        satisfies @ref WriteSink, native write operations are
200  
        forwarded through the virtual boundary.
200  
        forwarded through the virtual boundary.
201  

201  

202  
        @param s Pointer to the sink to wrap.
202  
        @param s Pointer to the sink to wrap.
203  
    */
203  
    */
204  
    template<BufferSink S>
204  
    template<BufferSink S>
205  
    any_buffer_sink(S* s);
205  
    any_buffer_sink(S* s);
206  

206  

207  
    /** Check if the wrapper contains a valid sink.
207  
    /** Check if the wrapper contains a valid sink.
208  

208  

209  
        @return `true` if wrapping a sink, `false` if default-constructed
209  
        @return `true` if wrapping a sink, `false` if default-constructed
210  
            or moved-from.
210  
            or moved-from.
211  
    */
211  
    */
212  
    bool
212  
    bool
213  
    has_value() const noexcept
213  
    has_value() const noexcept
214  
    {
214  
    {
215  
        return sink_ != nullptr;
215  
        return sink_ != nullptr;
216  
    }
216  
    }
217  

217  

218  
    /** Check if the wrapper contains a valid sink.
218  
    /** Check if the wrapper contains a valid sink.
219  

219  

220  
        @return `true` if wrapping a sink, `false` if default-constructed
220  
        @return `true` if wrapping a sink, `false` if default-constructed
221  
            or moved-from.
221  
            or moved-from.
222  
    */
222  
    */
223  
    explicit
223  
    explicit
224  
    operator bool() const noexcept
224  
    operator bool() const noexcept
225  
    {
225  
    {
226  
        return has_value();
226  
        return has_value();
227  
    }
227  
    }
228  

228  

229  
    /** Prepare writable buffers.
229  
    /** Prepare writable buffers.
230  

230  

231  
        Fills the provided span with mutable buffer descriptors
231  
        Fills the provided span with mutable buffer descriptors
232  
        pointing to the underlying sink's internal storage. This
232  
        pointing to the underlying sink's internal storage. This
233  
        operation is synchronous.
233  
        operation is synchronous.
234  

234  

235  
        @param dest Span of mutable_buffer to fill.
235  
        @param dest Span of mutable_buffer to fill.
236  

236  

237  
        @return A span of filled buffers.
237  
        @return A span of filled buffers.
238  

238  

239  
        @par Preconditions
239  
        @par Preconditions
240  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
    */
241  
    */
242  
    std::span<mutable_buffer>
242  
    std::span<mutable_buffer>
243  
    prepare(std::span<mutable_buffer> dest);
243  
    prepare(std::span<mutable_buffer> dest);
244  

244  

245  
    /** Commit bytes written to the prepared buffers.
245  
    /** Commit bytes written to the prepared buffers.
246  

246  

247  
        Commits `n` bytes written to the buffers returned by the
247  
        Commits `n` bytes written to the buffers returned by the
248  
        most recent call to @ref prepare. The operation may trigger
248  
        most recent call to @ref prepare. The operation may trigger
249  
        underlying I/O.
249  
        underlying I/O.
250  

250  

251  
        @param n The number of bytes to commit.
251  
        @param n The number of bytes to commit.
252  

252  

253  
        @return An awaitable yielding `(error_code)`.
253  
        @return An awaitable yielding `(error_code)`.
254  

254  

255  
        @par Preconditions
255  
        @par Preconditions
256  
        The wrapper must contain a valid sink (`has_value() == true`).
256  
        The wrapper must contain a valid sink (`has_value() == true`).
257  
    */
257  
    */
258  
    auto
258  
    auto
259  
    commit(std::size_t n);
259  
    commit(std::size_t n);
260  

260  

261  
    /** Commit final bytes and signal end-of-stream.
261  
    /** Commit final bytes and signal end-of-stream.
262  

262  

263  
        Commits `n` bytes written to the buffers returned by the
263  
        Commits `n` bytes written to the buffers returned by the
264  
        most recent call to @ref prepare and finalizes the sink.
264  
        most recent call to @ref prepare and finalizes the sink.
265  
        After success, no further operations are permitted.
265  
        After success, no further operations are permitted.
266  

266  

267  
        @param n The number of bytes to commit.
267  
        @param n The number of bytes to commit.
268  

268  

269  
        @return An awaitable yielding `(error_code)`.
269  
        @return An awaitable yielding `(error_code)`.
270  

270  

271  
        @par Preconditions
271  
        @par Preconditions
272  
        The wrapper must contain a valid sink (`has_value() == true`).
272  
        The wrapper must contain a valid sink (`has_value() == true`).
273  
    */
273  
    */
274  
    auto
274  
    auto
275  
    commit_eof(std::size_t n);
275  
    commit_eof(std::size_t n);
276  

276  

277  
    /** Write some data from a buffer sequence.
277  
    /** Write some data from a buffer sequence.
278  

278  

279  
        Writes one or more bytes from the buffer sequence to the
279  
        Writes one or more bytes from the buffer sequence to the
280  
        underlying sink. May consume less than the full sequence.
280  
        underlying sink. May consume less than the full sequence.
281  

281  

282  
        When the wrapped type provides native @ref WriteSink support,
282  
        When the wrapped type provides native @ref WriteSink support,
283  
        the operation forwards directly. Otherwise it is synthesized
283  
        the operation forwards directly. Otherwise it is synthesized
284  
        from @ref prepare and @ref commit with a buffer copy.
284  
        from @ref prepare and @ref commit with a buffer copy.
285  

285  

286  
        @param buffers The buffer sequence to write.
286  
        @param buffers The buffer sequence to write.
287  

287  

288  
        @return An awaitable yielding `(error_code,std::size_t)`.
288  
        @return An awaitable yielding `(error_code,std::size_t)`.
289  

289  

290  
        @par Preconditions
290  
        @par Preconditions
291  
        The wrapper must contain a valid sink (`has_value() == true`).
291  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
    */
292  
    */
293  
    template<ConstBufferSequence CB>
293  
    template<ConstBufferSequence CB>
294  
    io_task<std::size_t>
294  
    io_task<std::size_t>
295  
    write_some(CB buffers);
295  
    write_some(CB buffers);
296  

296  

297  
    /** Write all data from a buffer sequence.
297  
    /** Write all data from a buffer sequence.
298  

298  

299  
        Writes all data from the buffer sequence to the underlying
299  
        Writes all data from the buffer sequence to the underlying
300  
        sink. This method satisfies the @ref WriteSink concept.
300  
        sink. This method satisfies the @ref WriteSink concept.
301  

301  

302  
        When the wrapped type provides native @ref WriteSink support,
302  
        When the wrapped type provides native @ref WriteSink support,
303  
        each window is forwarded directly. Otherwise the data is
303  
        each window is forwarded directly. Otherwise the data is
304  
        copied into the sink via @ref prepare and @ref commit.
304  
        copied into the sink via @ref prepare and @ref commit.
305  

305  

306  
        @param buffers The buffer sequence to write.
306  
        @param buffers The buffer sequence to write.
307  

307  

308  
        @return An awaitable yielding `(error_code,std::size_t)`.
308  
        @return An awaitable yielding `(error_code,std::size_t)`.
309  

309  

310  
        @par Preconditions
310  
        @par Preconditions
311  
        The wrapper must contain a valid sink (`has_value() == true`).
311  
        The wrapper must contain a valid sink (`has_value() == true`).
312  
    */
312  
    */
313  
    template<ConstBufferSequence CB>
313  
    template<ConstBufferSequence CB>
314  
    io_task<std::size_t>
314  
    io_task<std::size_t>
315  
    write(CB buffers);
315  
    write(CB buffers);
316  

316  

317  
    /** Atomically write data and signal end-of-stream.
317  
    /** Atomically write data and signal end-of-stream.
318  

318  

319  
        Writes all data from the buffer sequence to the underlying
319  
        Writes all data from the buffer sequence to the underlying
320  
        sink and then signals end-of-stream.
320  
        sink and then signals end-of-stream.
321  

321  

322  
        When the wrapped type provides native @ref WriteSink support,
322  
        When the wrapped type provides native @ref WriteSink support,
323  
        the final window is sent atomically via the underlying
323  
        the final window is sent atomically via the underlying
324  
        `write_eof(buffers)`. Otherwise the data is synthesized
324  
        `write_eof(buffers)`. Otherwise the data is synthesized
325  
        through @ref prepare, @ref commit, and @ref commit_eof.
325  
        through @ref prepare, @ref commit, and @ref commit_eof.
326  

326  

327  
        @param buffers The buffer sequence to write.
327  
        @param buffers The buffer sequence to write.
328  

328  

329  
        @return An awaitable yielding `(error_code,std::size_t)`.
329  
        @return An awaitable yielding `(error_code,std::size_t)`.
330  

330  

331  
        @par Preconditions
331  
        @par Preconditions
332  
        The wrapper must contain a valid sink (`has_value() == true`).
332  
        The wrapper must contain a valid sink (`has_value() == true`).
333  
    */
333  
    */
334  
    template<ConstBufferSequence CB>
334  
    template<ConstBufferSequence CB>
335  
    io_task<std::size_t>
335  
    io_task<std::size_t>
336  
    write_eof(CB buffers);
336  
    write_eof(CB buffers);
337  

337  

338  
    /** Signal end-of-stream.
338  
    /** Signal end-of-stream.
339  

339  

340  
        Indicates that no more data will be written to the sink.
340  
        Indicates that no more data will be written to the sink.
341  
        This method satisfies the @ref WriteSink concept.
341  
        This method satisfies the @ref WriteSink concept.
342  

342  

343  
        When the wrapped type provides native @ref WriteSink support,
343  
        When the wrapped type provides native @ref WriteSink support,
344  
        the underlying `write_eof()` is called. Otherwise the
344  
        the underlying `write_eof()` is called. Otherwise the
345  
        operation is implemented as `commit_eof(0)`.
345  
        operation is implemented as `commit_eof(0)`.
346  

346  

347  
        @return An awaitable yielding `(error_code)`.
347  
        @return An awaitable yielding `(error_code)`.
348  

348  

349  
        @par Preconditions
349  
        @par Preconditions
350  
        The wrapper must contain a valid sink (`has_value() == true`).
350  
        The wrapper must contain a valid sink (`has_value() == true`).
351  
    */
351  
    */
352  
    auto
352  
    auto
353  
    write_eof();
353  
    write_eof();
354  

354  

355  
protected:
355  
protected:
356  
    /** Rebind to a new sink after move.
356  
    /** Rebind to a new sink after move.
357  

357  

358  
        Updates the internal pointer to reference a new sink object.
358  
        Updates the internal pointer to reference a new sink object.
359  
        Used by owning wrappers after move assignment when the owned
359  
        Used by owning wrappers after move assignment when the owned
360  
        object has moved to a new location.
360  
        object has moved to a new location.
361  

361  

362  
        @param new_sink The new sink to bind to. Must be the same
362  
        @param new_sink The new sink to bind to. Must be the same
363  
            type as the original sink.
363  
            type as the original sink.
364  

364  

365  
        @note Terminates if called with a sink of different type
365  
        @note Terminates if called with a sink of different type
366  
            than the original.
366  
            than the original.
367  
    */
367  
    */
368  
    template<BufferSink S>
368  
    template<BufferSink S>
369  
    void
369  
    void
370  
    rebind(S& new_sink) noexcept
370  
    rebind(S& new_sink) noexcept
371  
    {
371  
    {
372  
        if(vt_ != &vtable_for_impl<S>::value)
372  
        if(vt_ != &vtable_for_impl<S>::value)
373  
            std::terminate();
373  
            std::terminate();
374  
        sink_ = &new_sink;
374  
        sink_ = &new_sink;
375  
    }
375  
    }
376  

376  

377  
private:
377  
private:
378  
    /** Forward a partial write through the vtable.
378  
    /** Forward a partial write through the vtable.
379  

379  

380  
        Constructs the underlying `write_some` awaitable in
380  
        Constructs the underlying `write_some` awaitable in
381  
        cached storage and returns a type-erased awaitable.
381  
        cached storage and returns a type-erased awaitable.
382  
    */
382  
    */
383  
    auto
383  
    auto
384  
    write_some_(std::span<const_buffer const> buffers);
384  
    write_some_(std::span<const_buffer const> buffers);
385  

385  

386  
    /** Forward a complete write through the vtable.
386  
    /** Forward a complete write through the vtable.
387  

387  

388  
        Constructs the underlying `write` awaitable in
388  
        Constructs the underlying `write` awaitable in
389  
        cached storage and returns a type-erased awaitable.
389  
        cached storage and returns a type-erased awaitable.
390  
    */
390  
    */
391  
    auto
391  
    auto
392  
    write_(std::span<const_buffer const> buffers);
392  
    write_(std::span<const_buffer const> buffers);
393  

393  

394  
    /** Forward an atomic write-with-EOF through the vtable.
394  
    /** Forward an atomic write-with-EOF through the vtable.
395  

395  

396  
        Constructs the underlying `write_eof(buffers)` awaitable
396  
        Constructs the underlying `write_eof(buffers)` awaitable
397  
        in cached storage and returns a type-erased awaitable.
397  
        in cached storage and returns a type-erased awaitable.
398  
    */
398  
    */
399  
    auto
399  
    auto
400  
    write_eof_buffers_(std::span<const_buffer const> buffers);
400  
    write_eof_buffers_(std::span<const_buffer const> buffers);
401  
};
401  
};
402  

402  

403  
/** Type-erased ops for awaitables yielding `io_result<>`. */
403  
/** Type-erased ops for awaitables yielding `io_result<>`. */
404  
struct any_buffer_sink::awaitable_ops
404  
struct any_buffer_sink::awaitable_ops
405  
{
405  
{
406  
    bool (*await_ready)(void*);
406  
    bool (*await_ready)(void*);
407  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
407  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
408  
    io_result<> (*await_resume)(void*);
408  
    io_result<> (*await_resume)(void*);
409  
    void (*destroy)(void*) noexcept;
409  
    void (*destroy)(void*) noexcept;
410  
};
410  
};
411  

411  

412  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
412  
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
413  
struct any_buffer_sink::write_awaitable_ops
413  
struct any_buffer_sink::write_awaitable_ops
414  
{
414  
{
415  
    bool (*await_ready)(void*);
415  
    bool (*await_ready)(void*);
416  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
416  
    std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
417  
    io_result<std::size_t> (*await_resume)(void*);
417  
    io_result<std::size_t> (*await_resume)(void*);
418  
    void (*destroy)(void*) noexcept;
418  
    void (*destroy)(void*) noexcept;
419  
};
419  
};
420  

420  

421  
struct any_buffer_sink::vtable
421  
struct any_buffer_sink::vtable
422  
{
422  
{
423  
    void (*destroy)(void*) noexcept;
423  
    void (*destroy)(void*) noexcept;
424  
    std::span<mutable_buffer> (*do_prepare)(
424  
    std::span<mutable_buffer> (*do_prepare)(
425  
        void* sink,
425  
        void* sink,
426  
        std::span<mutable_buffer> dest);
426  
        std::span<mutable_buffer> dest);
427  
    std::size_t awaitable_size;
427  
    std::size_t awaitable_size;
428  
    std::size_t awaitable_align;
428  
    std::size_t awaitable_align;
429  
    awaitable_ops const* (*construct_commit_awaitable)(
429  
    awaitable_ops const* (*construct_commit_awaitable)(
430  
        void* sink,
430  
        void* sink,
431  
        void* storage,
431  
        void* storage,
432  
        std::size_t n);
432  
        std::size_t n);
433  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
433  
    awaitable_ops const* (*construct_commit_eof_awaitable)(
434  
        void* sink,
434  
        void* sink,
435  
        void* storage,
435  
        void* storage,
436  
        std::size_t n);
436  
        std::size_t n);
437  

437  

438  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
438  
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
439  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
439  
    write_awaitable_ops const* (*construct_write_some_awaitable)(
440  
        void* sink,
440  
        void* sink,
441  
        void* storage,
441  
        void* storage,
442  
        std::span<const_buffer const> buffers);
442  
        std::span<const_buffer const> buffers);
443  
    write_awaitable_ops const* (*construct_write_awaitable)(
443  
    write_awaitable_ops const* (*construct_write_awaitable)(
444  
        void* sink,
444  
        void* sink,
445  
        void* storage,
445  
        void* storage,
446  
        std::span<const_buffer const> buffers);
446  
        std::span<const_buffer const> buffers);
447  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
447  
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
448  
        void* sink,
448  
        void* sink,
449  
        void* storage,
449  
        void* storage,
450  
        std::span<const_buffer const> buffers);
450  
        std::span<const_buffer const> buffers);
451  
    awaitable_ops const* (*construct_write_eof_awaitable)(
451  
    awaitable_ops const* (*construct_write_eof_awaitable)(
452  
        void* sink,
452  
        void* sink,
453  
        void* storage);
453  
        void* storage);
454  
};
454  
};
455  

455  

456  
template<BufferSink S>
456  
template<BufferSink S>
457  
struct any_buffer_sink::vtable_for_impl
457  
struct any_buffer_sink::vtable_for_impl
458  
{
458  
{
459  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
459  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
460  
        std::size_t{}));
460  
        std::size_t{}));
461  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
461  
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
462  
        std::size_t{}));
462  
        std::size_t{}));
463  

463  

464  
    static void
464  
    static void
465  
    do_destroy_impl(void* sink) noexcept
465  
    do_destroy_impl(void* sink) noexcept
466  
    {
466  
    {
467  
        static_cast<S*>(sink)->~S();
467  
        static_cast<S*>(sink)->~S();
468  
    }
468  
    }
469  

469  

470  
    static std::span<mutable_buffer>
470  
    static std::span<mutable_buffer>
471  
    do_prepare_impl(
471  
    do_prepare_impl(
472  
        void* sink,
472  
        void* sink,
473  
        std::span<mutable_buffer> dest)
473  
        std::span<mutable_buffer> dest)
474  
    {
474  
    {
475  
        auto& s = *static_cast<S*>(sink);
475  
        auto& s = *static_cast<S*>(sink);
476  
        return s.prepare(dest);
476  
        return s.prepare(dest);
477  
    }
477  
    }
478  

478  

479  
    static awaitable_ops const*
479  
    static awaitable_ops const*
480  
    construct_commit_awaitable_impl(
480  
    construct_commit_awaitable_impl(
481  
        void* sink,
481  
        void* sink,
482  
        void* storage,
482  
        void* storage,
483  
        std::size_t n)
483  
        std::size_t n)
484  
    {
484  
    {
485  
        auto& s = *static_cast<S*>(sink);
485  
        auto& s = *static_cast<S*>(sink);
486  
        ::new(storage) CommitAwaitable(s.commit(n));
486  
        ::new(storage) CommitAwaitable(s.commit(n));
487  

487  

488  
        static constexpr awaitable_ops ops = {
488  
        static constexpr awaitable_ops ops = {
489  
            +[](void* p) {
489  
            +[](void* p) {
490  
                return static_cast<CommitAwaitable*>(p)->await_ready();
490  
                return static_cast<CommitAwaitable*>(p)->await_ready();
491  
            },
491  
            },
492  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
492  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
493  
                return detail::call_await_suspend(
493  
                return detail::call_await_suspend(
494  
                    static_cast<CommitAwaitable*>(p), h, env);
494  
                    static_cast<CommitAwaitable*>(p), h, env);
495  
            },
495  
            },
496  
            +[](void* p) {
496  
            +[](void* p) {
497  
                return static_cast<CommitAwaitable*>(p)->await_resume();
497  
                return static_cast<CommitAwaitable*>(p)->await_resume();
498  
            },
498  
            },
499  
            +[](void* p) noexcept {
499  
            +[](void* p) noexcept {
500  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
500  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
501  
            }
501  
            }
502  
        };
502  
        };
503  
        return &ops;
503  
        return &ops;
504  
    }
504  
    }
505  

505  

506  
    static awaitable_ops const*
506  
    static awaitable_ops const*
507  
    construct_commit_eof_awaitable_impl(
507  
    construct_commit_eof_awaitable_impl(
508  
        void* sink,
508  
        void* sink,
509  
        void* storage,
509  
        void* storage,
510  
        std::size_t n)
510  
        std::size_t n)
511  
    {
511  
    {
512  
        auto& s = *static_cast<S*>(sink);
512  
        auto& s = *static_cast<S*>(sink);
513  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
513  
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
514  

514  

515  
        static constexpr awaitable_ops ops = {
515  
        static constexpr awaitable_ops ops = {
516  
            +[](void* p) {
516  
            +[](void* p) {
517  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
517  
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
518  
            },
518  
            },
519  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
519  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
520  
                return detail::call_await_suspend(
520  
                return detail::call_await_suspend(
521  
                    static_cast<CommitEofAwaitable*>(p), h, env);
521  
                    static_cast<CommitEofAwaitable*>(p), h, env);
522  
            },
522  
            },
523  
            +[](void* p) {
523  
            +[](void* p) {
524  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
524  
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
525  
            },
525  
            },
526  
            +[](void* p) noexcept {
526  
            +[](void* p) noexcept {
527  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
527  
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
528  
            }
528  
            }
529  
        };
529  
        };
530  
        return &ops;
530  
        return &ops;
531  
    }
531  
    }
532  

532  

533  
    static write_awaitable_ops const*
533  
    static write_awaitable_ops const*
534  
    construct_write_some_awaitable_impl(
534  
    construct_write_some_awaitable_impl(
535  
        void* sink,
535  
        void* sink,
536  
        void* storage,
536  
        void* storage,
537  
        std::span<const_buffer const> buffers)
537  
        std::span<const_buffer const> buffers)
538  
        requires WriteSink<S>
538  
        requires WriteSink<S>
539  
    {
539  
    {
540  
        using Aw = decltype(std::declval<S&>().write_some(
540  
        using Aw = decltype(std::declval<S&>().write_some(
541  
            std::span<const_buffer const>{}));
541  
            std::span<const_buffer const>{}));
542  
        auto& s = *static_cast<S*>(sink);
542  
        auto& s = *static_cast<S*>(sink);
543  
        ::new(storage) Aw(s.write_some(buffers));
543  
        ::new(storage) Aw(s.write_some(buffers));
544  

544  

545  
        static constexpr write_awaitable_ops ops = {
545  
        static constexpr write_awaitable_ops ops = {
546  
            +[](void* p) {
546  
            +[](void* p) {
547  
                return static_cast<Aw*>(p)->await_ready();
547  
                return static_cast<Aw*>(p)->await_ready();
548  
            },
548  
            },
549  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
549  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
550  
                return detail::call_await_suspend(
550  
                return detail::call_await_suspend(
551  
                    static_cast<Aw*>(p), h, env);
551  
                    static_cast<Aw*>(p), h, env);
552  
            },
552  
            },
553  
            +[](void* p) {
553  
            +[](void* p) {
554  
                return static_cast<Aw*>(p)->await_resume();
554  
                return static_cast<Aw*>(p)->await_resume();
555  
            },
555  
            },
556  
            +[](void* p) noexcept {
556  
            +[](void* p) noexcept {
557  
                static_cast<Aw*>(p)->~Aw();
557  
                static_cast<Aw*>(p)->~Aw();
558  
            }
558  
            }
559  
        };
559  
        };
560  
        return &ops;
560  
        return &ops;
561  
    }
561  
    }
562  

562  

563  
    static write_awaitable_ops const*
563  
    static write_awaitable_ops const*
564  
    construct_write_awaitable_impl(
564  
    construct_write_awaitable_impl(
565  
        void* sink,
565  
        void* sink,
566  
        void* storage,
566  
        void* storage,
567  
        std::span<const_buffer const> buffers)
567  
        std::span<const_buffer const> buffers)
568  
        requires WriteSink<S>
568  
        requires WriteSink<S>
569  
    {
569  
    {
570  
        using Aw = decltype(std::declval<S&>().write(
570  
        using Aw = decltype(std::declval<S&>().write(
571  
            std::span<const_buffer const>{}));
571  
            std::span<const_buffer const>{}));
572  
        auto& s = *static_cast<S*>(sink);
572  
        auto& s = *static_cast<S*>(sink);
573  
        ::new(storage) Aw(s.write(buffers));
573  
        ::new(storage) Aw(s.write(buffers));
574  

574  

575  
        static constexpr write_awaitable_ops ops = {
575  
        static constexpr write_awaitable_ops ops = {
576  
            +[](void* p) {
576  
            +[](void* p) {
577  
                return static_cast<Aw*>(p)->await_ready();
577  
                return static_cast<Aw*>(p)->await_ready();
578  
            },
578  
            },
579  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
579  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
580  
                return detail::call_await_suspend(
580  
                return detail::call_await_suspend(
581  
                    static_cast<Aw*>(p), h, env);
581  
                    static_cast<Aw*>(p), h, env);
582  
            },
582  
            },
583  
            +[](void* p) {
583  
            +[](void* p) {
584  
                return static_cast<Aw*>(p)->await_resume();
584  
                return static_cast<Aw*>(p)->await_resume();
585  
            },
585  
            },
586  
            +[](void* p) noexcept {
586  
            +[](void* p) noexcept {
587  
                static_cast<Aw*>(p)->~Aw();
587  
                static_cast<Aw*>(p)->~Aw();
588  
            }
588  
            }
589  
        };
589  
        };
590  
        return &ops;
590  
        return &ops;
591  
    }
591  
    }
592  

592  

593  
    static write_awaitable_ops const*
593  
    static write_awaitable_ops const*
594  
    construct_write_eof_buffers_awaitable_impl(
594  
    construct_write_eof_buffers_awaitable_impl(
595  
        void* sink,
595  
        void* sink,
596  
        void* storage,
596  
        void* storage,
597  
        std::span<const_buffer const> buffers)
597  
        std::span<const_buffer const> buffers)
598  
        requires WriteSink<S>
598  
        requires WriteSink<S>
599  
    {
599  
    {
600  
        using Aw = decltype(std::declval<S&>().write_eof(
600  
        using Aw = decltype(std::declval<S&>().write_eof(
601  
            std::span<const_buffer const>{}));
601  
            std::span<const_buffer const>{}));
602  
        auto& s = *static_cast<S*>(sink);
602  
        auto& s = *static_cast<S*>(sink);
603  
        ::new(storage) Aw(s.write_eof(buffers));
603  
        ::new(storage) Aw(s.write_eof(buffers));
604  

604  

605  
        static constexpr write_awaitable_ops ops = {
605  
        static constexpr write_awaitable_ops ops = {
606  
            +[](void* p) {
606  
            +[](void* p) {
607  
                return static_cast<Aw*>(p)->await_ready();
607  
                return static_cast<Aw*>(p)->await_ready();
608  
            },
608  
            },
609  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
609  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
610  
                return detail::call_await_suspend(
610  
                return detail::call_await_suspend(
611  
                    static_cast<Aw*>(p), h, env);
611  
                    static_cast<Aw*>(p), h, env);
612  
            },
612  
            },
613  
            +[](void* p) {
613  
            +[](void* p) {
614  
                return static_cast<Aw*>(p)->await_resume();
614  
                return static_cast<Aw*>(p)->await_resume();
615  
            },
615  
            },
616  
            +[](void* p) noexcept {
616  
            +[](void* p) noexcept {
617  
                static_cast<Aw*>(p)->~Aw();
617  
                static_cast<Aw*>(p)->~Aw();
618  
            }
618  
            }
619  
        };
619  
        };
620  
        return &ops;
620  
        return &ops;
621  
    }
621  
    }
622  

622  

623  
    static awaitable_ops const*
623  
    static awaitable_ops const*
624  
    construct_write_eof_awaitable_impl(
624  
    construct_write_eof_awaitable_impl(
625  
        void* sink,
625  
        void* sink,
626  
        void* storage)
626  
        void* storage)
627  
        requires WriteSink<S>
627  
        requires WriteSink<S>
628  
    {
628  
    {
629  
        using Aw = decltype(std::declval<S&>().write_eof());
629  
        using Aw = decltype(std::declval<S&>().write_eof());
630  
        auto& s = *static_cast<S*>(sink);
630  
        auto& s = *static_cast<S*>(sink);
631  
        ::new(storage) Aw(s.write_eof());
631  
        ::new(storage) Aw(s.write_eof());
632  

632  

633  
        static constexpr awaitable_ops ops = {
633  
        static constexpr awaitable_ops ops = {
634  
            +[](void* p) {
634  
            +[](void* p) {
635  
                return static_cast<Aw*>(p)->await_ready();
635  
                return static_cast<Aw*>(p)->await_ready();
636  
            },
636  
            },
637  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
637  
            +[](void* p, std::coroutine_handle<> h, io_env const* env) {
638  
                return detail::call_await_suspend(
638  
                return detail::call_await_suspend(
639  
                    static_cast<Aw*>(p), h, env);
639  
                    static_cast<Aw*>(p), h, env);
640  
            },
640  
            },
641  
            +[](void* p) {
641  
            +[](void* p) {
642  
                return static_cast<Aw*>(p)->await_resume();
642  
                return static_cast<Aw*>(p)->await_resume();
643  
            },
643  
            },
644  
            +[](void* p) noexcept {
644  
            +[](void* p) noexcept {
645  
                static_cast<Aw*>(p)->~Aw();
645  
                static_cast<Aw*>(p)->~Aw();
646  
            }
646  
            }
647  
        };
647  
        };
648  
        return &ops;
648  
        return &ops;
649  
    }
649  
    }
650  

650  

651  
    static consteval std::size_t
651  
    static consteval std::size_t
652  
    compute_max_size() noexcept
652  
    compute_max_size() noexcept
653  
    {
653  
    {
654  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
654  
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
655  
            ? sizeof(CommitAwaitable)
655  
            ? sizeof(CommitAwaitable)
656  
            : sizeof(CommitEofAwaitable);
656  
            : sizeof(CommitEofAwaitable);
657  
        if constexpr (WriteSink<S>)
657  
        if constexpr (WriteSink<S>)
658  
        {
658  
        {
659  
            using WS = decltype(std::declval<S&>().write_some(
659  
            using WS = decltype(std::declval<S&>().write_some(
660  
                std::span<const_buffer const>{}));
660  
                std::span<const_buffer const>{}));
661  
            using W = decltype(std::declval<S&>().write(
661  
            using W = decltype(std::declval<S&>().write(
662  
                std::span<const_buffer const>{}));
662  
                std::span<const_buffer const>{}));
663  
            using WEB = decltype(std::declval<S&>().write_eof(
663  
            using WEB = decltype(std::declval<S&>().write_eof(
664  
                std::span<const_buffer const>{}));
664  
                std::span<const_buffer const>{}));
665  
            using WE = decltype(std::declval<S&>().write_eof());
665  
            using WE = decltype(std::declval<S&>().write_eof());
666  

666  

667  
            if(sizeof(WS) > s) s = sizeof(WS);
667  
            if(sizeof(WS) > s) s = sizeof(WS);
668  
            if(sizeof(W) > s) s = sizeof(W);
668  
            if(sizeof(W) > s) s = sizeof(W);
669  
            if(sizeof(WEB) > s) s = sizeof(WEB);
669  
            if(sizeof(WEB) > s) s = sizeof(WEB);
670  
            if(sizeof(WE) > s) s = sizeof(WE);
670  
            if(sizeof(WE) > s) s = sizeof(WE);
671  
        }
671  
        }
672  
        return s;
672  
        return s;
673  
    }
673  
    }
674  

674  

675  
    static consteval std::size_t
675  
    static consteval std::size_t
676  
    compute_max_align() noexcept
676  
    compute_max_align() noexcept
677  
    {
677  
    {
678  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
678  
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
679  
            ? alignof(CommitAwaitable)
679  
            ? alignof(CommitAwaitable)
680  
            : alignof(CommitEofAwaitable);
680  
            : alignof(CommitEofAwaitable);
681  
        if constexpr (WriteSink<S>)
681  
        if constexpr (WriteSink<S>)
682  
        {
682  
        {
683  
            using WS = decltype(std::declval<S&>().write_some(
683  
            using WS = decltype(std::declval<S&>().write_some(
684  
                std::span<const_buffer const>{}));
684  
                std::span<const_buffer const>{}));
685  
            using W = decltype(std::declval<S&>().write(
685  
            using W = decltype(std::declval<S&>().write(
686  
                std::span<const_buffer const>{}));
686  
                std::span<const_buffer const>{}));
687  
            using WEB = decltype(std::declval<S&>().write_eof(
687  
            using WEB = decltype(std::declval<S&>().write_eof(
688  
                std::span<const_buffer const>{}));
688  
                std::span<const_buffer const>{}));
689  
            using WE = decltype(std::declval<S&>().write_eof());
689  
            using WE = decltype(std::declval<S&>().write_eof());
690  

690  

691  
            if(alignof(WS) > a) a = alignof(WS);
691  
            if(alignof(WS) > a) a = alignof(WS);
692  
            if(alignof(W) > a) a = alignof(W);
692  
            if(alignof(W) > a) a = alignof(W);
693  
            if(alignof(WEB) > a) a = alignof(WEB);
693  
            if(alignof(WEB) > a) a = alignof(WEB);
694  
            if(alignof(WE) > a) a = alignof(WE);
694  
            if(alignof(WE) > a) a = alignof(WE);
695  
        }
695  
        }
696  
        return a;
696  
        return a;
697  
    }
697  
    }
698  

698  

699  
    static consteval vtable
699  
    static consteval vtable
700  
    make_vtable() noexcept
700  
    make_vtable() noexcept
701  
    {
701  
    {
702  
        vtable v{};
702  
        vtable v{};
703  
        v.destroy = &do_destroy_impl;
703  
        v.destroy = &do_destroy_impl;
704  
        v.do_prepare = &do_prepare_impl;
704  
        v.do_prepare = &do_prepare_impl;
705  
        v.awaitable_size = compute_max_size();
705  
        v.awaitable_size = compute_max_size();
706  
        v.awaitable_align = compute_max_align();
706  
        v.awaitable_align = compute_max_align();
707  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
707  
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
708  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
708  
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
709  
        v.construct_write_some_awaitable = nullptr;
709  
        v.construct_write_some_awaitable = nullptr;
710  
        v.construct_write_awaitable = nullptr;
710  
        v.construct_write_awaitable = nullptr;
711  
        v.construct_write_eof_buffers_awaitable = nullptr;
711  
        v.construct_write_eof_buffers_awaitable = nullptr;
712  
        v.construct_write_eof_awaitable = nullptr;
712  
        v.construct_write_eof_awaitable = nullptr;
713  

713  

714  
        if constexpr (WriteSink<S>)
714  
        if constexpr (WriteSink<S>)
715  
        {
715  
        {
716  
            v.construct_write_some_awaitable =
716  
            v.construct_write_some_awaitable =
717  
                &construct_write_some_awaitable_impl;
717  
                &construct_write_some_awaitable_impl;
718  
            v.construct_write_awaitable =
718  
            v.construct_write_awaitable =
719  
                &construct_write_awaitable_impl;
719  
                &construct_write_awaitable_impl;
720  
            v.construct_write_eof_buffers_awaitable =
720  
            v.construct_write_eof_buffers_awaitable =
721  
                &construct_write_eof_buffers_awaitable_impl;
721  
                &construct_write_eof_buffers_awaitable_impl;
722  
            v.construct_write_eof_awaitable =
722  
            v.construct_write_eof_awaitable =
723  
                &construct_write_eof_awaitable_impl;
723  
                &construct_write_eof_awaitable_impl;
724  
        }
724  
        }
725  
        return v;
725  
        return v;
726  
    }
726  
    }
727  

727  

728  
    static constexpr vtable value = make_vtable();
728  
    static constexpr vtable value = make_vtable();
729  
};
729  
};
730  

730  

731  
inline
731  
inline
732  
any_buffer_sink::~any_buffer_sink()
732  
any_buffer_sink::~any_buffer_sink()
733  
{
733  
{
734  
    if(storage_)
734  
    if(storage_)
735  
    {
735  
    {
736  
        vt_->destroy(sink_);
736  
        vt_->destroy(sink_);
737  
        ::operator delete(storage_);
737  
        ::operator delete(storage_);
738  
    }
738  
    }
739  
    if(cached_awaitable_)
739  
    if(cached_awaitable_)
740  
        ::operator delete(cached_awaitable_);
740  
        ::operator delete(cached_awaitable_);
741  
}
741  
}
742  

742  

743  
inline any_buffer_sink&
743  
inline any_buffer_sink&
744  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
744  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
745  
{
745  
{
746  
    if(this != &other)
746  
    if(this != &other)
747  
    {
747  
    {
748  
        if(storage_)
748  
        if(storage_)
749  
        {
749  
        {
750  
            vt_->destroy(sink_);
750  
            vt_->destroy(sink_);
751  
            ::operator delete(storage_);
751  
            ::operator delete(storage_);
752  
        }
752  
        }
753  
        if(cached_awaitable_)
753  
        if(cached_awaitable_)
754  
            ::operator delete(cached_awaitable_);
754  
            ::operator delete(cached_awaitable_);
755  
        sink_ = std::exchange(other.sink_, nullptr);
755  
        sink_ = std::exchange(other.sink_, nullptr);
756  
        vt_ = std::exchange(other.vt_, nullptr);
756  
        vt_ = std::exchange(other.vt_, nullptr);
757  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
757  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
758  
        storage_ = std::exchange(other.storage_, nullptr);
758  
        storage_ = std::exchange(other.storage_, nullptr);
759  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
759  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
760  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
760  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
761  
    }
761  
    }
762  
    return *this;
762  
    return *this;
763  
}
763  
}
764  

764  

765  
template<BufferSink S>
765  
template<BufferSink S>
766  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
766  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
767  
any_buffer_sink::any_buffer_sink(S s)
767  
any_buffer_sink::any_buffer_sink(S s)
768  
    : vt_(&vtable_for_impl<S>::value)
768  
    : vt_(&vtable_for_impl<S>::value)
769  
{
769  
{
770  
    struct guard {
770  
    struct guard {
771  
        any_buffer_sink* self;
771  
        any_buffer_sink* self;
772  
        bool committed = false;
772  
        bool committed = false;
773  
        ~guard() {
773  
        ~guard() {
774  
            if(!committed && self->storage_) {
774  
            if(!committed && self->storage_) {
775  
                self->vt_->destroy(self->sink_);
775  
                self->vt_->destroy(self->sink_);
776  
                ::operator delete(self->storage_);
776  
                ::operator delete(self->storage_);
777  
                self->storage_ = nullptr;
777  
                self->storage_ = nullptr;
778  
                self->sink_ = nullptr;
778  
                self->sink_ = nullptr;
779  
            }
779  
            }
780  
        }
780  
        }
781  
    } g{this};
781  
    } g{this};
782  

782  

783  
    storage_ = ::operator new(sizeof(S));
783  
    storage_ = ::operator new(sizeof(S));
784  
    sink_ = ::new(storage_) S(std::move(s));
784  
    sink_ = ::new(storage_) S(std::move(s));
785  

785  

786  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
786  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
787  

787  

788  
    g.committed = true;
788  
    g.committed = true;
789  
}
789  
}
790  

790  

791  
template<BufferSink S>
791  
template<BufferSink S>
792  
any_buffer_sink::any_buffer_sink(S* s)
792  
any_buffer_sink::any_buffer_sink(S* s)
793  
    : sink_(s)
793  
    : sink_(s)
794  
    , vt_(&vtable_for_impl<S>::value)
794  
    , vt_(&vtable_for_impl<S>::value)
795  
{
795  
{
796  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
796  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
797  
}
797  
}
798  

798  

799  
inline std::span<mutable_buffer>
799  
inline std::span<mutable_buffer>
800  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
800  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
801  
{
801  
{
802  
    return vt_->do_prepare(sink_, dest);
802  
    return vt_->do_prepare(sink_, dest);
803  
}
803  
}
804  

804  

805  
inline auto
805  
inline auto
806  
any_buffer_sink::commit(std::size_t n)
806  
any_buffer_sink::commit(std::size_t n)
807  
{
807  
{
808  
    struct awaitable
808  
    struct awaitable
809  
    {
809  
    {
810  
        any_buffer_sink* self_;
810  
        any_buffer_sink* self_;
811  
        std::size_t n_;
811  
        std::size_t n_;
812  

812  

813  
        bool
813  
        bool
814  
        await_ready()
814  
        await_ready()
815  
        {
815  
        {
816  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
816  
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
817  
                self_->sink_,
817  
                self_->sink_,
818  
                self_->cached_awaitable_,
818  
                self_->cached_awaitable_,
819  
                n_);
819  
                n_);
820  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
820  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
821  
        }
821  
        }
822  

822  

823  
        std::coroutine_handle<>
823  
        std::coroutine_handle<>
824  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
824  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
825  
        {
825  
        {
826  
            return self_->active_ops_->await_suspend(
826  
            return self_->active_ops_->await_suspend(
827  
                self_->cached_awaitable_, h, env);
827  
                self_->cached_awaitable_, h, env);
828  
        }
828  
        }
829  

829  

830  
        io_result<>
830  
        io_result<>
831  
        await_resume()
831  
        await_resume()
832  
        {
832  
        {
833  
            struct guard {
833  
            struct guard {
834  
                any_buffer_sink* self;
834  
                any_buffer_sink* self;
835  
                ~guard() {
835  
                ~guard() {
836  
                    self->active_ops_->destroy(self->cached_awaitable_);
836  
                    self->active_ops_->destroy(self->cached_awaitable_);
837  
                    self->active_ops_ = nullptr;
837  
                    self->active_ops_ = nullptr;
838  
                }
838  
                }
839  
            } g{self_};
839  
            } g{self_};
840  
            return self_->active_ops_->await_resume(
840  
            return self_->active_ops_->await_resume(
841  
                self_->cached_awaitable_);
841  
                self_->cached_awaitable_);
842  
        }
842  
        }
843  
    };
843  
    };
844  
    return awaitable{this, n};
844  
    return awaitable{this, n};
845  
}
845  
}
846  

846  

847  
inline auto
847  
inline auto
848  
any_buffer_sink::commit_eof(std::size_t n)
848  
any_buffer_sink::commit_eof(std::size_t n)
849  
{
849  
{
850  
    struct awaitable
850  
    struct awaitable
851  
    {
851  
    {
852  
        any_buffer_sink* self_;
852  
        any_buffer_sink* self_;
853  
        std::size_t n_;
853  
        std::size_t n_;
854  

854  

855  
        bool
855  
        bool
856  
        await_ready()
856  
        await_ready()
857  
        {
857  
        {
858  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
858  
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
859  
                self_->sink_,
859  
                self_->sink_,
860  
                self_->cached_awaitable_,
860  
                self_->cached_awaitable_,
861  
                n_);
861  
                n_);
862  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
862  
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
863  
        }
863  
        }
864  

864  

865  
        std::coroutine_handle<>
865  
        std::coroutine_handle<>
866  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
866  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
867  
        {
867  
        {
868  
            return self_->active_ops_->await_suspend(
868  
            return self_->active_ops_->await_suspend(
869  
                self_->cached_awaitable_, h, env);
869  
                self_->cached_awaitable_, h, env);
870  
        }
870  
        }
871  

871  

872  
        io_result<>
872  
        io_result<>
873  
        await_resume()
873  
        await_resume()
874  
        {
874  
        {
875  
            struct guard {
875  
            struct guard {
876  
                any_buffer_sink* self;
876  
                any_buffer_sink* self;
877  
                ~guard() {
877  
                ~guard() {
878  
                    self->active_ops_->destroy(self->cached_awaitable_);
878  
                    self->active_ops_->destroy(self->cached_awaitable_);
879  
                    self->active_ops_ = nullptr;
879  
                    self->active_ops_ = nullptr;
880  
                }
880  
                }
881  
            } g{self_};
881  
            } g{self_};
882  
            return self_->active_ops_->await_resume(
882  
            return self_->active_ops_->await_resume(
883  
                self_->cached_awaitable_);
883  
                self_->cached_awaitable_);
884  
        }
884  
        }
885  
    };
885  
    };
886  
    return awaitable{this, n};
886  
    return awaitable{this, n};
887  
}
887  
}
888  

888  

889  
inline auto
889  
inline auto
890  
any_buffer_sink::write_some_(
890  
any_buffer_sink::write_some_(
891  
    std::span<const_buffer const> buffers)
891  
    std::span<const_buffer const> buffers)
892  
{
892  
{
893  
    struct awaitable
893  
    struct awaitable
894  
    {
894  
    {
895  
        any_buffer_sink* self_;
895  
        any_buffer_sink* self_;
896  
        std::span<const_buffer const> buffers_;
896  
        std::span<const_buffer const> buffers_;
897  

897  

898  
        bool
898  
        bool
899  
        await_ready() const noexcept
899  
        await_ready() const noexcept
900  
        {
900  
        {
901  
            return false;
901  
            return false;
902  
        }
902  
        }
903  

903  

904  
        std::coroutine_handle<>
904  
        std::coroutine_handle<>
905  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
905  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
906  
        {
906  
        {
907  
            self_->active_write_ops_ =
907  
            self_->active_write_ops_ =
908  
                self_->vt_->construct_write_some_awaitable(
908  
                self_->vt_->construct_write_some_awaitable(
909  
                    self_->sink_,
909  
                    self_->sink_,
910  
                    self_->cached_awaitable_,
910  
                    self_->cached_awaitable_,
911  
                    buffers_);
911  
                    buffers_);
912  

912  

913  
            if(self_->active_write_ops_->await_ready(
913  
            if(self_->active_write_ops_->await_ready(
914  
                self_->cached_awaitable_))
914  
                self_->cached_awaitable_))
915  
                return h;
915  
                return h;
916  

916  

917  
            return self_->active_write_ops_->await_suspend(
917  
            return self_->active_write_ops_->await_suspend(
918  
                self_->cached_awaitable_, h, env);
918  
                self_->cached_awaitable_, h, env);
919  
        }
919  
        }
920  

920  

921  
        io_result<std::size_t>
921  
        io_result<std::size_t>
922  
        await_resume()
922  
        await_resume()
923  
        {
923  
        {
924  
            struct guard {
924  
            struct guard {
925  
                any_buffer_sink* self;
925  
                any_buffer_sink* self;
926  
                ~guard() {
926  
                ~guard() {
927  
                    self->active_write_ops_->destroy(
927  
                    self->active_write_ops_->destroy(
928  
                        self->cached_awaitable_);
928  
                        self->cached_awaitable_);
929  
                    self->active_write_ops_ = nullptr;
929  
                    self->active_write_ops_ = nullptr;
930  
                }
930  
                }
931  
            } g{self_};
931  
            } g{self_};
932  
            return self_->active_write_ops_->await_resume(
932  
            return self_->active_write_ops_->await_resume(
933  
                self_->cached_awaitable_);
933  
                self_->cached_awaitable_);
934  
        }
934  
        }
935  
    };
935  
    };
936  
    return awaitable{this, buffers};
936  
    return awaitable{this, buffers};
937  
}
937  
}
938  

938  

939  
inline auto
939  
inline auto
940  
any_buffer_sink::write_(
940  
any_buffer_sink::write_(
941  
    std::span<const_buffer const> buffers)
941  
    std::span<const_buffer const> buffers)
942  
{
942  
{
943  
    struct awaitable
943  
    struct awaitable
944  
    {
944  
    {
945  
        any_buffer_sink* self_;
945  
        any_buffer_sink* self_;
946  
        std::span<const_buffer const> buffers_;
946  
        std::span<const_buffer const> buffers_;
947  

947  

948  
        bool
948  
        bool
949  
        await_ready() const noexcept
949  
        await_ready() const noexcept
950  
        {
950  
        {
951  
            return false;
951  
            return false;
952  
        }
952  
        }
953  

953  

954  
        std::coroutine_handle<>
954  
        std::coroutine_handle<>
955  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
955  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
956  
        {
956  
        {
957  
            self_->active_write_ops_ =
957  
            self_->active_write_ops_ =
958  
                self_->vt_->construct_write_awaitable(
958  
                self_->vt_->construct_write_awaitable(
959  
                    self_->sink_,
959  
                    self_->sink_,
960  
                    self_->cached_awaitable_,
960  
                    self_->cached_awaitable_,
961  
                    buffers_);
961  
                    buffers_);
962  

962  

963  
            if(self_->active_write_ops_->await_ready(
963  
            if(self_->active_write_ops_->await_ready(
964  
                self_->cached_awaitable_))
964  
                self_->cached_awaitable_))
965  
                return h;
965  
                return h;
966  

966  

967  
            return self_->active_write_ops_->await_suspend(
967  
            return self_->active_write_ops_->await_suspend(
968  
                self_->cached_awaitable_, h, env);
968  
                self_->cached_awaitable_, h, env);
969  
        }
969  
        }
970  

970  

971  
        io_result<std::size_t>
971  
        io_result<std::size_t>
972  
        await_resume()
972  
        await_resume()
973  
        {
973  
        {
974  
            struct guard {
974  
            struct guard {
975  
                any_buffer_sink* self;
975  
                any_buffer_sink* self;
976  
                ~guard() {
976  
                ~guard() {
977  
                    self->active_write_ops_->destroy(
977  
                    self->active_write_ops_->destroy(
978  
                        self->cached_awaitable_);
978  
                        self->cached_awaitable_);
979  
                    self->active_write_ops_ = nullptr;
979  
                    self->active_write_ops_ = nullptr;
980  
                }
980  
                }
981  
            } g{self_};
981  
            } g{self_};
982  
            return self_->active_write_ops_->await_resume(
982  
            return self_->active_write_ops_->await_resume(
983  
                self_->cached_awaitable_);
983  
                self_->cached_awaitable_);
984  
        }
984  
        }
985  
    };
985  
    };
986  
    return awaitable{this, buffers};
986  
    return awaitable{this, buffers};
987  
}
987  
}
988  

988  

989  
inline auto
989  
inline auto
990  
any_buffer_sink::write_eof_buffers_(
990  
any_buffer_sink::write_eof_buffers_(
991  
    std::span<const_buffer const> buffers)
991  
    std::span<const_buffer const> buffers)
992  
{
992  
{
993  
    struct awaitable
993  
    struct awaitable
994  
    {
994  
    {
995  
        any_buffer_sink* self_;
995  
        any_buffer_sink* self_;
996  
        std::span<const_buffer const> buffers_;
996  
        std::span<const_buffer const> buffers_;
997  

997  

998  
        bool
998  
        bool
999  
        await_ready() const noexcept
999  
        await_ready() const noexcept
1000  
        {
1000  
        {
1001  
            return false;
1001  
            return false;
1002  
        }
1002  
        }
1003  

1003  

1004  
        std::coroutine_handle<>
1004  
        std::coroutine_handle<>
1005  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1005  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1006  
        {
1006  
        {
1007  
            self_->active_write_ops_ =
1007  
            self_->active_write_ops_ =
1008  
                self_->vt_->construct_write_eof_buffers_awaitable(
1008  
                self_->vt_->construct_write_eof_buffers_awaitable(
1009  
                    self_->sink_,
1009  
                    self_->sink_,
1010  
                    self_->cached_awaitable_,
1010  
                    self_->cached_awaitable_,
1011  
                    buffers_);
1011  
                    buffers_);
1012  

1012  

1013  
            if(self_->active_write_ops_->await_ready(
1013  
            if(self_->active_write_ops_->await_ready(
1014  
                self_->cached_awaitable_))
1014  
                self_->cached_awaitable_))
1015  
                return h;
1015  
                return h;
1016  

1016  

1017  
            return self_->active_write_ops_->await_suspend(
1017  
            return self_->active_write_ops_->await_suspend(
1018  
                self_->cached_awaitable_, h, env);
1018  
                self_->cached_awaitable_, h, env);
1019  
        }
1019  
        }
1020  

1020  

1021  
        io_result<std::size_t>
1021  
        io_result<std::size_t>
1022  
        await_resume()
1022  
        await_resume()
1023  
        {
1023  
        {
1024  
            struct guard {
1024  
            struct guard {
1025  
                any_buffer_sink* self;
1025  
                any_buffer_sink* self;
1026  
                ~guard() {
1026  
                ~guard() {
1027  
                    self->active_write_ops_->destroy(
1027  
                    self->active_write_ops_->destroy(
1028  
                        self->cached_awaitable_);
1028  
                        self->cached_awaitable_);
1029  
                    self->active_write_ops_ = nullptr;
1029  
                    self->active_write_ops_ = nullptr;
1030  
                }
1030  
                }
1031  
            } g{self_};
1031  
            } g{self_};
1032  
            return self_->active_write_ops_->await_resume(
1032  
            return self_->active_write_ops_->await_resume(
1033  
                self_->cached_awaitable_);
1033  
                self_->cached_awaitable_);
1034  
        }
1034  
        }
1035  
    };
1035  
    };
1036  
    return awaitable{this, buffers};
1036  
    return awaitable{this, buffers};
1037  
}
1037  
}
1038  

1038  

1039  
template<ConstBufferSequence CB>
1039  
template<ConstBufferSequence CB>
1040  
io_task<std::size_t>
1040  
io_task<std::size_t>
1041  
any_buffer_sink::write_some(CB buffers)
1041  
any_buffer_sink::write_some(CB buffers)
1042  
{
1042  
{
1043  
    buffer_param<CB> bp(buffers);
1043  
    buffer_param<CB> bp(buffers);
1044  
    auto src = bp.data();
1044  
    auto src = bp.data();
1045  
    if(src.empty())
1045  
    if(src.empty())
1046  
        co_return {{}, 0};
1046  
        co_return {{}, 0};
1047  

1047  

1048  
    // Native WriteSink path
1048  
    // Native WriteSink path
1049  
    if(vt_->construct_write_some_awaitable)
1049  
    if(vt_->construct_write_some_awaitable)
1050  
        co_return co_await write_some_(src);
1050  
        co_return co_await write_some_(src);
1051  

1051  

1052  
    // Synthesized path: prepare + buffer_copy + commit
1052  
    // Synthesized path: prepare + buffer_copy + commit
1053  
    mutable_buffer arr[detail::max_iovec_];
1053  
    mutable_buffer arr[detail::max_iovec_];
1054  
    auto dst_bufs = prepare(arr);
1054  
    auto dst_bufs = prepare(arr);
1055  
    if(dst_bufs.empty())
1055  
    if(dst_bufs.empty())
1056  
    {
1056  
    {
1057  
        auto [ec] = co_await commit(0);
1057  
        auto [ec] = co_await commit(0);
1058  
        if(ec)
1058  
        if(ec)
1059  
            co_return {ec, 0};
1059  
            co_return {ec, 0};
1060  
        dst_bufs = prepare(arr);
1060  
        dst_bufs = prepare(arr);
1061  
        if(dst_bufs.empty())
1061  
        if(dst_bufs.empty())
1062  
            co_return {{}, 0};
1062  
            co_return {{}, 0};
1063  
    }
1063  
    }
1064  

1064  

1065  
    auto n = buffer_copy(dst_bufs, src);
1065  
    auto n = buffer_copy(dst_bufs, src);
1066  
    auto [ec] = co_await commit(n);
1066  
    auto [ec] = co_await commit(n);
1067  
    if(ec)
1067  
    if(ec)
1068  
        co_return {ec, 0};
1068  
        co_return {ec, 0};
1069  
    co_return {{}, n};
1069  
    co_return {{}, n};
1070  
}
1070  
}
1071  

1071  

1072  
template<ConstBufferSequence CB>
1072  
template<ConstBufferSequence CB>
1073  
io_task<std::size_t>
1073  
io_task<std::size_t>
1074  
any_buffer_sink::write(CB buffers)
1074  
any_buffer_sink::write(CB buffers)
1075  
{
1075  
{
1076  
    buffer_param<CB> bp(buffers);
1076  
    buffer_param<CB> bp(buffers);
1077  
    std::size_t total = 0;
1077  
    std::size_t total = 0;
1078  

1078  

1079  
    // Native WriteSink path
1079  
    // Native WriteSink path
1080  
    if(vt_->construct_write_awaitable)
1080  
    if(vt_->construct_write_awaitable)
1081  
    {
1081  
    {
1082  
        for(;;)
1082  
        for(;;)
1083  
        {
1083  
        {
1084  
            auto bufs = bp.data();
1084  
            auto bufs = bp.data();
1085  
            if(bufs.empty())
1085  
            if(bufs.empty())
1086  
                break;
1086  
                break;
1087  

1087  

1088  
            auto [ec, n] = co_await write_(bufs);
1088  
            auto [ec, n] = co_await write_(bufs);
1089  
            total += n;
1089  
            total += n;
1090  
            if(ec)
1090  
            if(ec)
1091  
                co_return {ec, total};
1091  
                co_return {ec, total};
1092  
            bp.consume(n);
1092  
            bp.consume(n);
1093  
        }
1093  
        }
1094  
        co_return {{}, total};
1094  
        co_return {{}, total};
1095  
    }
1095  
    }
1096  

1096  

1097  
    // Synthesized path: prepare + buffer_copy + commit
1097  
    // Synthesized path: prepare + buffer_copy + commit
1098  
    for(;;)
1098  
    for(;;)
1099  
    {
1099  
    {
1100  
        auto src = bp.data();
1100  
        auto src = bp.data();
1101  
        if(src.empty())
1101  
        if(src.empty())
1102  
            break;
1102  
            break;
1103  

1103  

1104  
        mutable_buffer arr[detail::max_iovec_];
1104  
        mutable_buffer arr[detail::max_iovec_];
1105  
        auto dst_bufs = prepare(arr);
1105  
        auto dst_bufs = prepare(arr);
1106  
        if(dst_bufs.empty())
1106  
        if(dst_bufs.empty())
1107  
        {
1107  
        {
1108  
            auto [ec] = co_await commit(0);
1108  
            auto [ec] = co_await commit(0);
1109  
            if(ec)
1109  
            if(ec)
1110  
                co_return {ec, total};
1110  
                co_return {ec, total};
1111  
            continue;
1111  
            continue;
1112  
        }
1112  
        }
1113  

1113  

1114  
        auto n = buffer_copy(dst_bufs, src);
1114  
        auto n = buffer_copy(dst_bufs, src);
1115  
        auto [ec] = co_await commit(n);
1115  
        auto [ec] = co_await commit(n);
1116  
        if(ec)
1116  
        if(ec)
1117  
            co_return {ec, total};
1117  
            co_return {ec, total};
1118  
        bp.consume(n);
1118  
        bp.consume(n);
1119  
        total += n;
1119  
        total += n;
1120  
    }
1120  
    }
1121  

1121  

1122  
    co_return {{}, total};
1122  
    co_return {{}, total};
1123  
}
1123  
}
1124  

1124  

1125  
inline auto
1125  
inline auto
1126  
any_buffer_sink::write_eof()
1126  
any_buffer_sink::write_eof()
1127  
{
1127  
{
1128  
    struct awaitable
1128  
    struct awaitable
1129  
    {
1129  
    {
1130  
        any_buffer_sink* self_;
1130  
        any_buffer_sink* self_;
1131  

1131  

1132  
        bool
1132  
        bool
1133  
        await_ready()
1133  
        await_ready()
1134  
        {
1134  
        {
1135  
            if(self_->vt_->construct_write_eof_awaitable)
1135  
            if(self_->vt_->construct_write_eof_awaitable)
1136  
            {
1136  
            {
1137  
                // Native WriteSink: forward to underlying write_eof()
1137  
                // Native WriteSink: forward to underlying write_eof()
1138  
                self_->active_ops_ =
1138  
                self_->active_ops_ =
1139  
                    self_->vt_->construct_write_eof_awaitable(
1139  
                    self_->vt_->construct_write_eof_awaitable(
1140  
                        self_->sink_,
1140  
                        self_->sink_,
1141  
                        self_->cached_awaitable_);
1141  
                        self_->cached_awaitable_);
1142  
            }
1142  
            }
1143  
            else
1143  
            else
1144  
            {
1144  
            {
1145  
                // Synthesized: commit_eof(0)
1145  
                // Synthesized: commit_eof(0)
1146  
                self_->active_ops_ =
1146  
                self_->active_ops_ =
1147  
                    self_->vt_->construct_commit_eof_awaitable(
1147  
                    self_->vt_->construct_commit_eof_awaitable(
1148  
                        self_->sink_,
1148  
                        self_->sink_,
1149  
                        self_->cached_awaitable_,
1149  
                        self_->cached_awaitable_,
1150  
                        0);
1150  
                        0);
1151  
            }
1151  
            }
1152  
            return self_->active_ops_->await_ready(
1152  
            return self_->active_ops_->await_ready(
1153  
                self_->cached_awaitable_);
1153  
                self_->cached_awaitable_);
1154  
        }
1154  
        }
1155  

1155  

1156  
        std::coroutine_handle<>
1156  
        std::coroutine_handle<>
1157  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1157  
        await_suspend(std::coroutine_handle<> h, io_env const* env)
1158  
        {
1158  
        {
1159  
            return self_->active_ops_->await_suspend(
1159  
            return self_->active_ops_->await_suspend(
1160  
                self_->cached_awaitable_, h, env);
1160  
                self_->cached_awaitable_, h, env);
1161  
        }
1161  
        }
1162  

1162  

1163  
        io_result<>
1163  
        io_result<>
1164  
        await_resume()
1164  
        await_resume()
1165  
        {
1165  
        {
1166  
            struct guard {
1166  
            struct guard {
1167  
                any_buffer_sink* self;
1167  
                any_buffer_sink* self;
1168  
                ~guard() {
1168  
                ~guard() {
1169  
                    self->active_ops_->destroy(self->cached_awaitable_);
1169  
                    self->active_ops_->destroy(self->cached_awaitable_);
1170  
                    self->active_ops_ = nullptr;
1170  
                    self->active_ops_ = nullptr;
1171  
                }
1171  
                }
1172  
            } g{self_};
1172  
            } g{self_};
1173  
            return self_->active_ops_->await_resume(
1173  
            return self_->active_ops_->await_resume(
1174  
                self_->cached_awaitable_);
1174  
                self_->cached_awaitable_);
1175  
        }
1175  
        }
1176  
    };
1176  
    };
1177  
    return awaitable{this};
1177  
    return awaitable{this};
1178  
}
1178  
}
1179  

1179  

1180  
template<ConstBufferSequence CB>
1180  
template<ConstBufferSequence CB>
1181  
io_task<std::size_t>
1181  
io_task<std::size_t>
1182  
any_buffer_sink::write_eof(CB buffers)
1182  
any_buffer_sink::write_eof(CB buffers)
1183  
{
1183  
{
1184  
    // Native WriteSink path
1184  
    // Native WriteSink path
1185  
    if(vt_->construct_write_eof_buffers_awaitable)
1185  
    if(vt_->construct_write_eof_buffers_awaitable)
1186  
    {
1186  
    {
1187  
        const_buffer_param<CB> bp(buffers);
1187  
        const_buffer_param<CB> bp(buffers);
1188  
        std::size_t total = 0;
1188  
        std::size_t total = 0;
1189  

1189  

1190  
        for(;;)
1190  
        for(;;)
1191  
        {
1191  
        {
1192  
            auto bufs = bp.data();
1192  
            auto bufs = bp.data();
1193  
            if(bufs.empty())
1193  
            if(bufs.empty())
1194  
            {
1194  
            {
1195  
                auto [ec] = co_await write_eof();
1195  
                auto [ec] = co_await write_eof();
1196  
                co_return {ec, total};
1196  
                co_return {ec, total};
1197  
            }
1197  
            }
1198  

1198  

1199  
            if(!bp.more())
1199  
            if(!bp.more())
1200  
            {
1200  
            {
1201  
                // Last window: send atomically with EOF
1201  
                // Last window: send atomically with EOF
1202  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1202  
                auto [ec, n] = co_await write_eof_buffers_(bufs);
1203  
                total += n;
1203  
                total += n;
1204  
                co_return {ec, total};
1204  
                co_return {ec, total};
1205  
            }
1205  
            }
1206  

1206  

1207  
            auto [ec, n] = co_await write_(bufs);
1207  
            auto [ec, n] = co_await write_(bufs);
1208  
            total += n;
1208  
            total += n;
1209  
            if(ec)
1209  
            if(ec)
1210  
                co_return {ec, total};
1210  
                co_return {ec, total};
1211  
            bp.consume(n);
1211  
            bp.consume(n);
1212  
        }
1212  
        }
1213  
    }
1213  
    }
1214  

1214  

1215  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1215  
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
1216  
    buffer_param<CB> bp(buffers);
1216  
    buffer_param<CB> bp(buffers);
1217  
    std::size_t total = 0;
1217  
    std::size_t total = 0;
1218  

1218  

1219  
    for(;;)
1219  
    for(;;)
1220  
    {
1220  
    {
1221  
        auto src = bp.data();
1221  
        auto src = bp.data();
1222  
        if(src.empty())
1222  
        if(src.empty())
1223  
            break;
1223  
            break;
1224  

1224  

1225  
        mutable_buffer arr[detail::max_iovec_];
1225  
        mutable_buffer arr[detail::max_iovec_];
1226  
        auto dst_bufs = prepare(arr);
1226  
        auto dst_bufs = prepare(arr);
1227  
        if(dst_bufs.empty())
1227  
        if(dst_bufs.empty())
1228  
        {
1228  
        {
1229  
            auto [ec] = co_await commit(0);
1229  
            auto [ec] = co_await commit(0);
1230  
            if(ec)
1230  
            if(ec)
1231  
                co_return {ec, total};
1231  
                co_return {ec, total};
1232  
            continue;
1232  
            continue;
1233  
        }
1233  
        }
1234  

1234  

1235  
        auto n = buffer_copy(dst_bufs, src);
1235  
        auto n = buffer_copy(dst_bufs, src);
1236  
        auto [ec] = co_await commit(n);
1236  
        auto [ec] = co_await commit(n);
1237  
        if(ec)
1237  
        if(ec)
1238  
            co_return {ec, total};
1238  
            co_return {ec, total};
1239  
        bp.consume(n);
1239  
        bp.consume(n);
1240  
        total += n;
1240  
        total += n;
1241  
    }
1241  
    }
1242  

1242  

1243  
    auto [ec] = co_await commit_eof(0);
1243  
    auto [ec] = co_await commit_eof(0);
1244  
    if(ec)
1244  
    if(ec)
1245  
        co_return {ec, total};
1245  
        co_return {ec, total};
1246  

1246  

1247  
    co_return {{}, total};
1247  
    co_return {{}, total};
1248  
}
1248  
}
1249  

1249  

1250  
static_assert(BufferSink<any_buffer_sink>);
1250  
static_assert(BufferSink<any_buffer_sink>);
1251  
static_assert(WriteSink<any_buffer_sink>);
1251  
static_assert(WriteSink<any_buffer_sink>);
1252  

1252  

1253  
} // namespace capy
1253  
} // namespace capy
1254  
} // namespace boost
1254  
} // namespace boost
1255  

1255  

1256  
#endif
1256  
#endif