LCOV - code coverage report
Current view: top level - capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 90.6 % 212 192 20
Test Date: 2026-03-09 22:47:34 Functions: 77.9 % 77 60 17

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11                 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/await_suspend_helper.hpp>
      15                 : #include <boost/capy/buffers.hpp>
      16                 : #include <boost/capy/buffers/buffer_array.hpp>
      17                 : #include <boost/capy/buffers/buffer_param.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <boost/capy/concept/write_sink.hpp>
      20                 : #include <coroutine>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/io_result.hpp>
      23                 : #include <boost/capy/io_task.hpp>
      24                 : 
      25                 : #include <concepts>
      26                 : #include <coroutine>
      27                 : #include <cstddef>
      28                 : #include <exception>
      29                 : #include <new>
      30                 : #include <span>
      31                 : #include <stop_token>
      32                 : #include <system_error>
      33                 : #include <utility>
      34                 : 
      35                 : namespace boost {
      36                 : namespace capy {
      37                 : 
      38                 : /** Type-erased wrapper for any WriteSink.
      39                 : 
      40                 :     This class provides type erasure for any type satisfying the
      41                 :     @ref WriteSink concept, enabling runtime polymorphism for
      42                 :     sink write operations. It uses cached awaitable storage to achieve
      43                 :     zero steady-state allocation after construction.
      44                 : 
      45                 :     The wrapper supports two construction modes:
      46                 :     - **Owning**: Pass by value to transfer ownership. The wrapper
      47                 :       allocates storage and owns the sink.
      48                 :     - **Reference**: Pass a pointer to wrap without ownership. The
      49                 :       pointed-to sink must outlive this wrapper.
      50                 : 
      51                 :     @par Awaitable Preallocation
      52                 :     The constructor preallocates storage for the type-erased awaitable.
      53                 :     This reserves all virtual address space at server startup
      54                 :     so memory usage can be measured up front, rather than
      55                 :     allocating piecemeal as traffic arrives.
      56                 : 
      57                 :     @par Immediate Completion
      58                 :     Operations complete immediately without suspending when the
      59                 :     buffer sequence is empty, or when the underlying sink's
      60                 :     awaitable reports readiness via `await_ready`.
      61                 : 
      62                 :     @par Thread Safety
      63                 :     Not thread-safe. Concurrent operations on the same wrapper
      64                 :     are undefined behavior.
      65                 : 
      66                 :     @par Example
      67                 :     @code
      68                 :     // Owning - takes ownership of the sink
      69                 :     any_write_sink ws(some_sink{args...});
      70                 : 
      71                 :     // Reference - wraps without ownership
      72                 :     some_sink sink;
      73                 :     any_write_sink ws(&sink);
      74                 : 
      75                 :     const_buffer buf(data, size);
      76                 :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      77                 :     auto [ec2] = co_await ws.write_eof();
      78                 :     @endcode
      79                 : 
      80                 :     @see any_write_stream, WriteSink
      81                 : */
      82                 : class any_write_sink
      83                 : {
      84                 :     struct vtable;
      85                 :     struct write_awaitable_ops;
      86                 :     struct eof_awaitable_ops;
      87                 : 
      88                 :     template<WriteSink S>
      89                 :     struct vtable_for_impl;
      90                 : 
      91                 :     void* sink_ = nullptr;
      92                 :     vtable const* vt_ = nullptr;
      93                 :     void* cached_awaitable_ = nullptr;
      94                 :     void* storage_ = nullptr;
      95                 :     write_awaitable_ops const* active_write_ops_ = nullptr;
      96                 :     eof_awaitable_ops const* active_eof_ops_ = nullptr;
      97                 : 
      98                 : public:
      99                 :     /** Destructor.
     100                 : 
     101                 :         Destroys the owned sink (if any) and releases the cached
     102                 :         awaitable storage.
     103                 :     */
     104                 :     ~any_write_sink();
     105                 : 
     106                 :     /** Construct a default instance.
     107                 : 
     108                 :         Constructs an empty wrapper. Operations on a default-constructed
     109                 :         wrapper result in undefined behavior.
     110                 :     */
     111                 :     any_write_sink() = default;
     112                 : 
     113                 :     /** Non-copyable.
     114                 : 
     115                 :         The awaitable cache is per-instance and cannot be shared.
     116                 :     */
     117                 :     any_write_sink(any_write_sink const&) = delete;
     118                 :     any_write_sink& operator=(any_write_sink const&) = delete;
     119                 : 
     120                 :     /** Construct by moving.
     121                 : 
     122                 :         Transfers ownership of the wrapped sink (if owned) and
     123                 :         cached awaitable storage from `other`. After the move, `other` is
     124                 :         in a default-constructed state.
     125                 : 
     126                 :         @param other The wrapper to move from.
     127                 :     */
     128 HIT           1 :     any_write_sink(any_write_sink&& other) noexcept
     129               1 :         : sink_(std::exchange(other.sink_, nullptr))
     130               1 :         , vt_(std::exchange(other.vt_, nullptr))
     131               1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     132               1 :         , storage_(std::exchange(other.storage_, nullptr))
     133               1 :         , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
     134               1 :         , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
     135                 :     {
     136               1 :     }
     137                 : 
     138                 :     /** Assign by moving.
     139                 : 
     140                 :         Destroys any owned sink and releases existing resources,
     141                 :         then transfers ownership from `other`.
     142                 : 
     143                 :         @param other The wrapper to move from.
     144                 :         @return Reference to this wrapper.
     145                 :     */
     146                 :     any_write_sink&
     147                 :     operator=(any_write_sink&& other) noexcept;
     148                 : 
     149                 :     /** Construct by taking ownership of a WriteSink.
     150                 : 
     151                 :         Allocates storage and moves the sink into this wrapper.
     152                 :         The wrapper owns the sink and will destroy it.
     153                 : 
     154                 :         @param s The sink to take ownership of.
     155                 :     */
     156                 :     template<WriteSink S>
     157                 :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     158                 :     any_write_sink(S s);
     159                 : 
     160                 :     /** Construct by wrapping a WriteSink without ownership.
     161                 : 
     162                 :         Wraps the given sink by pointer. The sink must remain
     163                 :         valid for the lifetime of this wrapper.
     164                 : 
     165                 :         @param s Pointer to the sink to wrap.
     166                 :     */
     167                 :     template<WriteSink S>
     168                 :     any_write_sink(S* s);
     169                 : 
     170                 :     /** Check if the wrapper contains a valid sink.
     171                 : 
     172                 :         @return `true` if wrapping a sink, `false` if default-constructed
     173                 :             or moved-from.
     174                 :     */
     175                 :     bool
     176              15 :     has_value() const noexcept
     177                 :     {
     178              15 :         return sink_ != nullptr;
     179                 :     }
     180                 : 
     181                 :     /** Check if the wrapper contains a valid sink.
     182                 : 
     183                 :         @return `true` if wrapping a sink, `false` if default-constructed
     184                 :             or moved-from.
     185                 :     */
     186                 :     explicit
     187               2 :     operator bool() const noexcept
     188                 :     {
     189               2 :         return has_value();
     190                 :     }
     191                 : 
     192                 :     /** Initiate a partial write operation.
     193                 : 
     194                 :         Writes one or more bytes from the provided buffer sequence.
     195                 :         May consume less than the full sequence.
     196                 : 
     197                 :         @param buffers The buffer sequence containing data to write.
     198                 : 
     199                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     200                 : 
     201                 :         @par Immediate Completion
     202                 :         The operation completes immediately without suspending
     203                 :         the calling coroutine when:
     204                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     205                 :         @li The underlying sink's awaitable reports immediate
     206                 :             readiness via `await_ready`.
     207                 : 
     208                 :         @note This is a partial operation and may not process the
     209                 :         entire buffer sequence. Use @ref write for guaranteed
     210                 :         complete transfer.
     211                 : 
     212                 :         @par Preconditions
     213                 :         The wrapper must contain a valid sink (`has_value() == true`).
     214                 :     */
     215                 :     template<ConstBufferSequence CB>
     216                 :     auto
     217                 :     write_some(CB buffers);
     218                 : 
     219                 :     /** Initiate a complete write operation.
     220                 : 
     221                 :         Writes data from the provided buffer sequence. The operation
     222                 :         completes when all bytes have been consumed, or an error
     223                 :         occurs. Forwards to the underlying sink's `write` operation,
     224                 :         windowed through @ref buffer_param when the sequence exceeds
     225                 :         the per-call buffer limit.
     226                 : 
     227                 :         @param buffers The buffer sequence containing data to write.
     228                 : 
     229                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     230                 : 
     231                 :         @par Immediate Completion
     232                 :         The operation completes immediately without suspending
     233                 :         the calling coroutine when:
     234                 :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     235                 :         @li Every underlying `write` call completes
     236                 :             immediately (the wrapped sink reports readiness
     237                 :             via `await_ready` on each iteration).
     238                 : 
     239                 :         @par Preconditions
     240                 :         The wrapper must contain a valid sink (`has_value() == true`).
     241                 :     */
     242                 :     template<ConstBufferSequence CB>
     243                 :     io_task<std::size_t>
     244                 :     write(CB buffers);
     245                 : 
     246                 :     /** Atomically write data and signal end-of-stream.
     247                 : 
     248                 :         Writes all data from the buffer sequence and then signals
     249                 :         end-of-stream. The implementation decides how to partition
     250                 :         the data across calls to the underlying sink's @ref write
     251                 :         and `write_eof`. When the caller's buffer sequence is
     252                 :         non-empty, the final call to the underlying sink is always
     253                 :         `write_eof` with a non-empty buffer sequence. When the
     254                 :         caller's buffer sequence is empty, only `write_eof()` with
     255                 :         no data is called.
     256                 : 
     257                 :         @param buffers The buffer sequence containing data to write.
     258                 : 
     259                 :         @return An awaitable yielding `(error_code,std::size_t)`.
     260                 : 
     261                 :         @par Immediate Completion
     262                 :         The operation completes immediately without suspending
     263                 :         the calling coroutine when:
     264                 :         @li The buffer sequence is empty. Only the @ref write_eof()
     265                 :             call is performed.
     266                 :         @li All underlying operations complete immediately (the
     267                 :             wrapped sink reports readiness via `await_ready`).
     268                 : 
     269                 :         @par Preconditions
     270                 :         The wrapper must contain a valid sink (`has_value() == true`).
     271                 :     */
     272                 :     template<ConstBufferSequence CB>
     273                 :     io_task<std::size_t>
     274                 :     write_eof(CB buffers);
     275                 : 
     276                 :     /** Signal end of data.
     277                 : 
     278                 :         Indicates that no more data will be written to the sink.
     279                 :         The operation completes when the sink is finalized, or
     280                 :         an error occurs.
     281                 : 
     282                 :         @return An awaitable yielding `(error_code)`.
     283                 : 
     284                 :         @par Immediate Completion
     285                 :         The operation completes immediately without suspending
     286                 :         the calling coroutine when the underlying sink's awaitable
     287                 :         reports immediate readiness via `await_ready`.
     288                 : 
     289                 :         @par Preconditions
     290                 :         The wrapper must contain a valid sink (`has_value() == true`).
     291                 :     */
     292                 :     auto
     293                 :     write_eof();
     294                 : 
     295                 : protected:
     296                 :     /** Rebind to a new sink after move.
     297                 : 
     298                 :         Updates the internal pointer to reference a new sink object.
     299                 :         Used by owning wrappers after move assignment when the owned
     300                 :         object has moved to a new location.
     301                 : 
     302                 :         @param new_sink The new sink to bind to. Must be the same
     303                 :             type as the original sink.
     304                 : 
     305                 :         @note Terminates if called with a sink of different type
     306                 :             than the original.
     307                 :     */
     308                 :     template<WriteSink S>
     309                 :     void
     310                 :     rebind(S& new_sink) noexcept
     311                 :     {
     312                 :         if(vt_ != &vtable_for_impl<S>::value)
     313                 :             std::terminate();
     314                 :         sink_ = &new_sink;
     315                 :     }
     316                 : 
     317                 : private:
     318                 :     auto
     319                 :     write_some_(std::span<const_buffer const> buffers);
     320                 : 
     321                 :     auto
     322                 :     write_(std::span<const_buffer const> buffers);
     323                 : 
     324                 :     auto
     325                 :     write_eof_buffers_(std::span<const_buffer const> buffers);
     326                 : };
     327                 : 
     328                 : struct any_write_sink::write_awaitable_ops
     329                 : {
     330                 :     bool (*await_ready)(void*);
     331                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     332                 :     io_result<std::size_t> (*await_resume)(void*);
     333                 :     void (*destroy)(void*) noexcept;
     334                 : };
     335                 : 
     336                 : struct any_write_sink::eof_awaitable_ops
     337                 : {
     338                 :     bool (*await_ready)(void*);
     339                 :     std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
     340                 :     io_result<> (*await_resume)(void*);
     341                 :     void (*destroy)(void*) noexcept;
     342                 : };
     343                 : 
     344                 : struct any_write_sink::vtable
     345                 : {
     346                 :     write_awaitable_ops const* (*construct_write_some_awaitable)(
     347                 :         void* sink,
     348                 :         void* storage,
     349                 :         std::span<const_buffer const> buffers);
     350                 :     write_awaitable_ops const* (*construct_write_awaitable)(
     351                 :         void* sink,
     352                 :         void* storage,
     353                 :         std::span<const_buffer const> buffers);
     354                 :     write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
     355                 :         void* sink,
     356                 :         void* storage,
     357                 :         std::span<const_buffer const> buffers);
     358                 :     eof_awaitable_ops const* (*construct_eof_awaitable)(
     359                 :         void* sink,
     360                 :         void* storage);
     361                 :     std::size_t awaitable_size;
     362                 :     std::size_t awaitable_align;
     363                 :     void (*destroy)(void*) noexcept;
     364                 : };
     365                 : 
     366                 : template<WriteSink S>
     367                 : struct any_write_sink::vtable_for_impl
     368                 : {
     369                 :     using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
     370                 :         std::span<const_buffer const>{}));
     371                 :     using WriteAwaitable = decltype(std::declval<S&>().write(
     372                 :         std::span<const_buffer const>{}));
     373                 :     using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
     374                 :         std::span<const_buffer const>{}));
     375                 :     using EofAwaitable = decltype(std::declval<S&>().write_eof());
     376                 : 
     377                 :     static void
     378               6 :     do_destroy_impl(void* sink) noexcept
     379                 :     {
     380               6 :         static_cast<S*>(sink)->~S();
     381               6 :     }
     382                 : 
     383                 :     static write_awaitable_ops const*
     384              40 :     construct_write_some_awaitable_impl(
     385                 :         void* sink,
     386                 :         void* storage,
     387                 :         std::span<const_buffer const> buffers)
     388                 :     {
     389              40 :         auto& s = *static_cast<S*>(sink);
     390              40 :         ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
     391                 : 
     392                 :         static constexpr write_awaitable_ops ops = {
     393              40 :             +[](void* p) {
     394              40 :                 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
     395                 :             },
     396               2 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     397               2 :                 return detail::call_await_suspend(
     398               2 :                     static_cast<WriteSomeAwaitable*>(p), h, env);
     399                 :             },
     400              38 :             +[](void* p) {
     401              38 :                 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
     402                 :             },
     403              42 :             +[](void* p) noexcept {
     404               2 :                 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
     405                 :             }
     406                 :         };
     407              40 :         return &ops;
     408                 :     }
     409                 : 
     410                 :     static write_awaitable_ops const*
     411              78 :     construct_write_awaitable_impl(
     412                 :         void* sink,
     413                 :         void* storage,
     414                 :         std::span<const_buffer const> buffers)
     415                 :     {
     416              78 :         auto& s = *static_cast<S*>(sink);
     417              78 :         ::new(storage) WriteAwaitable(s.write(buffers));
     418                 : 
     419                 :         static constexpr write_awaitable_ops ops = {
     420              78 :             +[](void* p) {
     421              78 :                 return static_cast<WriteAwaitable*>(p)->await_ready();
     422                 :             },
     423 MIS           0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     424               0 :                 return detail::call_await_suspend(
     425               0 :                     static_cast<WriteAwaitable*>(p), h, env);
     426                 :             },
     427 HIT          78 :             +[](void* p) {
     428              78 :                 return static_cast<WriteAwaitable*>(p)->await_resume();
     429                 :             },
     430              78 :             +[](void* p) noexcept {
     431 MIS           0 :                 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
     432                 :             }
     433                 :         };
     434 HIT          78 :         return &ops;
     435                 :     }
     436                 : 
     437                 :     static write_awaitable_ops const*
     438              16 :     construct_write_eof_buffers_awaitable_impl(
     439                 :         void* sink,
     440                 :         void* storage,
     441                 :         std::span<const_buffer const> buffers)
     442                 :     {
     443              16 :         auto& s = *static_cast<S*>(sink);
     444              16 :         ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
     445                 : 
     446                 :         static constexpr write_awaitable_ops ops = {
     447              16 :             +[](void* p) {
     448              16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
     449                 :             },
     450 MIS           0 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     451               0 :                 return detail::call_await_suspend(
     452               0 :                     static_cast<WriteEofBuffersAwaitable*>(p), h, env);
     453                 :             },
     454 HIT          16 :             +[](void* p) {
     455              16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
     456                 :             },
     457              16 :             +[](void* p) noexcept {
     458 MIS           0 :                 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
     459                 :             }
     460                 :         };
     461 HIT          16 :         return &ops;
     462                 :     }
     463                 : 
     464                 :     static eof_awaitable_ops const*
     465              17 :     construct_eof_awaitable_impl(
     466                 :         void* sink,
     467                 :         void* storage)
     468                 :     {
     469              17 :         auto& s = *static_cast<S*>(sink);
     470              17 :         ::new(storage) EofAwaitable(s.write_eof());
     471                 : 
     472                 :         static constexpr eof_awaitable_ops ops = {
     473              17 :             +[](void* p) {
     474              17 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     475                 :             },
     476               1 :             +[](void* p, std::coroutine_handle<> h, io_env const* env) {
     477               1 :                 return detail::call_await_suspend(
     478               1 :                     static_cast<EofAwaitable*>(p), h, env);
     479                 :             },
     480              16 :             +[](void* p) {
     481              16 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     482                 :             },
     483              18 :             +[](void* p) noexcept {
     484               1 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     485                 :             }
     486                 :         };
     487              17 :         return &ops;
     488                 :     }
     489                 : 
     490                 :     static constexpr std::size_t max4(
     491                 :         std::size_t a, std::size_t b,
     492                 :         std::size_t c, std::size_t d) noexcept
     493                 :     {
     494                 :         std::size_t ab = a > b ? a : b;
     495                 :         std::size_t cd = c > d ? c : d;
     496                 :         return ab > cd ? ab : cd;
     497                 :     }
     498                 : 
     499                 :     static constexpr std::size_t max_awaitable_size =
     500                 :         max4(sizeof(WriteSomeAwaitable),
     501                 :              sizeof(WriteAwaitable),
     502                 :              sizeof(WriteEofBuffersAwaitable),
     503                 :              sizeof(EofAwaitable));
     504                 : 
     505                 :     static constexpr std::size_t max_awaitable_align =
     506                 :         max4(alignof(WriteSomeAwaitable),
     507                 :              alignof(WriteAwaitable),
     508                 :              alignof(WriteEofBuffersAwaitable),
     509                 :              alignof(EofAwaitable));
     510                 : 
     511                 :     static constexpr vtable value = {
     512                 :         &construct_write_some_awaitable_impl,
     513                 :         &construct_write_awaitable_impl,
     514                 :         &construct_write_eof_buffers_awaitable_impl,
     515                 :         &construct_eof_awaitable_impl,
     516                 :         max_awaitable_size,
     517                 :         max_awaitable_align,
     518                 :         &do_destroy_impl
     519                 :     };
     520                 : };
     521                 : 
     522                 : inline
     523             129 : any_write_sink::~any_write_sink()
     524                 : {
     525             129 :     if(storage_)
     526                 :     {
     527               6 :         vt_->destroy(sink_);
     528               6 :         ::operator delete(storage_);
     529                 :     }
     530             129 :     if(cached_awaitable_)
     531                 :     {
     532             124 :         if(active_write_ops_)
     533               1 :             active_write_ops_->destroy(cached_awaitable_);
     534             123 :         else if(active_eof_ops_)
     535               1 :             active_eof_ops_->destroy(cached_awaitable_);
     536             124 :         ::operator delete(cached_awaitable_);
     537                 :     }
     538             129 : }
     539                 : 
     540                 : inline any_write_sink&
     541               2 : any_write_sink::operator=(any_write_sink&& other) noexcept
     542                 : {
     543               2 :     if(this != &other)
     544                 :     {
     545               2 :         if(storage_)
     546                 :         {
     547 MIS           0 :             vt_->destroy(sink_);
     548               0 :             ::operator delete(storage_);
     549                 :         }
     550 HIT           2 :         if(cached_awaitable_)
     551                 :         {
     552               1 :             if(active_write_ops_)
     553               1 :                 active_write_ops_->destroy(cached_awaitable_);
     554 MIS           0 :             else if(active_eof_ops_)
     555               0 :                 active_eof_ops_->destroy(cached_awaitable_);
     556 HIT           1 :             ::operator delete(cached_awaitable_);
     557                 :         }
     558               2 :         sink_ = std::exchange(other.sink_, nullptr);
     559               2 :         vt_ = std::exchange(other.vt_, nullptr);
     560               2 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     561               2 :         storage_ = std::exchange(other.storage_, nullptr);
     562               2 :         active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
     563               2 :         active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
     564                 :     }
     565               2 :     return *this;
     566                 : }
     567                 : 
     568                 : template<WriteSink S>
     569                 :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     570               6 : any_write_sink::any_write_sink(S s)
     571               6 :     : vt_(&vtable_for_impl<S>::value)
     572                 : {
     573                 :     struct guard {
     574                 :         any_write_sink* self;
     575                 :         bool committed = false;
     576               6 :         ~guard() {
     577               6 :             if(!committed && self->storage_) {
     578 MIS           0 :                 self->vt_->destroy(self->sink_);
     579               0 :                 ::operator delete(self->storage_);
     580               0 :                 self->storage_ = nullptr;
     581               0 :                 self->sink_ = nullptr;
     582                 :             }
     583 HIT           6 :         }
     584               6 :     } g{this};
     585                 : 
     586               6 :     storage_ = ::operator new(sizeof(S));
     587               6 :     sink_ = ::new(storage_) S(std::move(s));
     588                 : 
     589                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     590               6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     591                 : 
     592               6 :     g.committed = true;
     593               6 : }
     594                 : 
     595                 : template<WriteSink S>
     596             119 : any_write_sink::any_write_sink(S* s)
     597             119 :     : sink_(s)
     598             119 :     , vt_(&vtable_for_impl<S>::value)
     599                 : {
     600                 :     // Preallocate the awaitable storage (sized for max of write/eof)
     601             119 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     602             119 : }
     603                 : 
     604                 : inline auto
     605                 : any_write_sink::write_some_(
     606                 :     std::span<const_buffer const> buffers)
     607                 : {
     608                 :     struct awaitable
     609                 :     {
     610                 :         any_write_sink* self_;
     611                 :         std::span<const_buffer const> buffers_;
     612                 : 
     613                 :         bool
     614                 :         await_ready() const noexcept
     615                 :         {
     616                 :             return false;
     617                 :         }
     618                 : 
     619                 :         std::coroutine_handle<>
     620                 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     621                 :         {
     622                 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     623                 :                 self_->sink_,
     624                 :                 self_->cached_awaitable_,
     625                 :                 buffers_);
     626                 : 
     627                 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     628                 :                 return h;
     629                 : 
     630                 :             return self_->active_write_ops_->await_suspend(
     631                 :                 self_->cached_awaitable_, h, env);
     632                 :         }
     633                 : 
     634                 :         io_result<std::size_t>
     635                 :         await_resume()
     636                 :         {
     637                 :             struct guard {
     638                 :                 any_write_sink* self;
     639                 :                 ~guard() {
     640                 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     641                 :                     self->active_write_ops_ = nullptr;
     642                 :                 }
     643                 :             } g{self_};
     644                 :             return self_->active_write_ops_->await_resume(
     645                 :                 self_->cached_awaitable_);
     646                 :         }
     647                 :     };
     648                 :     return awaitable{this, buffers};
     649                 : }
     650                 : 
     651                 : inline auto
     652              78 : any_write_sink::write_(
     653                 :     std::span<const_buffer const> buffers)
     654                 : {
     655                 :     struct awaitable
     656                 :     {
     657                 :         any_write_sink* self_;
     658                 :         std::span<const_buffer const> buffers_;
     659                 : 
     660                 :         bool
     661              78 :         await_ready() const noexcept
     662                 :         {
     663              78 :             return false;
     664                 :         }
     665                 : 
     666                 :         std::coroutine_handle<>
     667              78 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     668                 :         {
     669             156 :             self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
     670              78 :                 self_->sink_,
     671              78 :                 self_->cached_awaitable_,
     672                 :                 buffers_);
     673                 : 
     674              78 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     675              78 :                 return h;
     676                 : 
     677 MIS           0 :             return self_->active_write_ops_->await_suspend(
     678               0 :                 self_->cached_awaitable_, h, env);
     679                 :         }
     680                 : 
     681                 :         io_result<std::size_t>
     682 HIT          78 :         await_resume()
     683                 :         {
     684                 :             struct guard {
     685                 :                 any_write_sink* self;
     686              78 :                 ~guard() {
     687              78 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     688              78 :                     self->active_write_ops_ = nullptr;
     689              78 :                 }
     690              78 :             } g{self_};
     691              78 :             return self_->active_write_ops_->await_resume(
     692             135 :                 self_->cached_awaitable_);
     693              78 :         }
     694                 :     };
     695              78 :     return awaitable{this, buffers};
     696                 : }
     697                 : 
     698                 : inline auto
     699              17 : any_write_sink::write_eof()
     700                 : {
     701                 :     struct awaitable
     702                 :     {
     703                 :         any_write_sink* self_;
     704                 : 
     705                 :         bool
     706              17 :         await_ready() const noexcept
     707                 :         {
     708              17 :             return false;
     709                 :         }
     710                 : 
     711                 :         std::coroutine_handle<>
     712              17 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     713                 :         {
     714                 :             // Construct the underlying awaitable into cached storage
     715              34 :             self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
     716              17 :                 self_->sink_,
     717              17 :                 self_->cached_awaitable_);
     718                 : 
     719                 :             // Check if underlying is immediately ready
     720              17 :             if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
     721              16 :                 return h;
     722                 : 
     723                 :             // Forward to underlying awaitable
     724               1 :             return self_->active_eof_ops_->await_suspend(
     725               1 :                 self_->cached_awaitable_, h, env);
     726                 :         }
     727                 : 
     728                 :         io_result<>
     729              16 :         await_resume()
     730                 :         {
     731                 :             struct guard {
     732                 :                 any_write_sink* self;
     733              16 :                 ~guard() {
     734              16 :                     self->active_eof_ops_->destroy(self->cached_awaitable_);
     735              16 :                     self->active_eof_ops_ = nullptr;
     736              16 :                 }
     737              16 :             } g{self_};
     738              16 :             return self_->active_eof_ops_->await_resume(
     739              27 :                 self_->cached_awaitable_);
     740              16 :         }
     741                 :     };
     742              17 :     return awaitable{this};
     743                 : }
     744                 : 
     745                 : inline auto
     746              16 : any_write_sink::write_eof_buffers_(
     747                 :     std::span<const_buffer const> buffers)
     748                 : {
     749                 :     struct awaitable
     750                 :     {
     751                 :         any_write_sink* self_;
     752                 :         std::span<const_buffer const> buffers_;
     753                 : 
     754                 :         bool
     755              16 :         await_ready() const noexcept
     756                 :         {
     757              16 :             return false;
     758                 :         }
     759                 : 
     760                 :         std::coroutine_handle<>
     761              16 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     762                 :         {
     763              32 :             self_->active_write_ops_ =
     764              32 :                 self_->vt_->construct_write_eof_buffers_awaitable(
     765              16 :                     self_->sink_,
     766              16 :                     self_->cached_awaitable_,
     767                 :                     buffers_);
     768                 : 
     769              16 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     770              16 :                 return h;
     771                 : 
     772 MIS           0 :             return self_->active_write_ops_->await_suspend(
     773               0 :                 self_->cached_awaitable_, h, env);
     774                 :         }
     775                 : 
     776                 :         io_result<std::size_t>
     777 HIT          16 :         await_resume()
     778                 :         {
     779                 :             struct guard {
     780                 :                 any_write_sink* self;
     781              16 :                 ~guard() {
     782              16 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     783              16 :                     self->active_write_ops_ = nullptr;
     784              16 :                 }
     785              16 :             } g{self_};
     786              16 :             return self_->active_write_ops_->await_resume(
     787              27 :                 self_->cached_awaitable_);
     788              16 :         }
     789                 :     };
     790              16 :     return awaitable{this, buffers};
     791                 : }
     792                 : 
     793                 : template<ConstBufferSequence CB>
     794                 : auto
     795              42 : any_write_sink::write_some(CB buffers)
     796                 : {
     797                 :     struct awaitable
     798                 :     {
     799                 :         any_write_sink* self_;
     800                 :         const_buffer_array<detail::max_iovec_> ba_;
     801                 : 
     802              42 :         awaitable(
     803                 :             any_write_sink* self,
     804                 :             CB const& buffers)
     805              42 :             : self_(self)
     806              42 :             , ba_(buffers)
     807                 :         {
     808              42 :         }
     809                 : 
     810                 :         bool
     811              42 :         await_ready() const noexcept
     812                 :         {
     813              42 :             return ba_.to_span().empty();
     814                 :         }
     815                 : 
     816                 :         std::coroutine_handle<>
     817              40 :         await_suspend(std::coroutine_handle<> h, io_env const* env)
     818                 :         {
     819              40 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     820              40 :                 self_->sink_,
     821              40 :                 self_->cached_awaitable_,
     822              40 :                 ba_.to_span());
     823                 : 
     824              40 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     825              38 :                 return h;
     826                 : 
     827               2 :             return self_->active_write_ops_->await_suspend(
     828               2 :                 self_->cached_awaitable_, h, env);
     829                 :         }
     830                 : 
     831                 :         io_result<std::size_t>
     832              40 :         await_resume()
     833                 :         {
     834              40 :             if(ba_.to_span().empty())
     835               2 :                 return {{}, 0};
     836                 : 
     837                 :             struct guard {
     838                 :                 any_write_sink* self;
     839              38 :                 ~guard() {
     840              38 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     841              38 :                     self->active_write_ops_ = nullptr;
     842              38 :                 }
     843              38 :             } g{self_};
     844              38 :             return self_->active_write_ops_->await_resume(
     845              38 :                 self_->cached_awaitable_);
     846              38 :         }
     847                 :     };
     848              42 :     return awaitable{this, buffers};
     849                 : }
     850                 : 
     851                 : template<ConstBufferSequence CB>
     852                 : io_task<std::size_t>
     853              68 : any_write_sink::write(CB buffers)
     854                 : {
     855                 :     buffer_param<CB> bp(buffers);
     856                 :     std::size_t total = 0;
     857                 : 
     858                 :     for(;;)
     859                 :     {
     860                 :         auto bufs = bp.data();
     861                 :         if(bufs.empty())
     862                 :             break;
     863                 : 
     864                 :         auto [ec, n] = co_await write_(bufs);
     865                 :         total += n;
     866                 :         if(ec)
     867                 :             co_return {ec, total};
     868                 :         bp.consume(n);
     869                 :     }
     870                 : 
     871                 :     co_return {{}, total};
     872             136 : }
     873                 : 
     874                 : template<ConstBufferSequence CB>
     875                 : io_task<std::size_t>
     876              26 : any_write_sink::write_eof(CB buffers)
     877                 : {
     878                 :     const_buffer_param<CB> bp(buffers);
     879                 :     std::size_t total = 0;
     880                 : 
     881                 :     for(;;)
     882                 :     {
     883                 :         auto bufs = bp.data();
     884                 :         if(bufs.empty())
     885                 :         {
     886                 :             auto [ec] = co_await write_eof();
     887                 :             co_return {ec, total};
     888                 :         }
     889                 : 
     890                 :         if(! bp.more())
     891                 :         {
     892                 :             // Last window — send atomically with EOF
     893                 :             auto [ec, n] = co_await write_eof_buffers_(bufs);
     894                 :             total += n;
     895                 :             co_return {ec, total};
     896                 :         }
     897                 : 
     898                 :         auto [ec, n] = co_await write_(bufs);
     899                 :         total += n;
     900                 :         if(ec)
     901                 :             co_return {ec, total};
     902                 :         bp.consume(n);
     903                 :     }
     904              52 : }
     905                 : 
     906                 : } // namespace capy
     907                 : } // namespace boost
     908                 : 
     909                 : #endif
        

Generated by: LCOV version 2.3