include/boost/capy/io/any_write_stream.hpp

94.1% Lines (95/101) 85.7% Functions (30/35)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <coroutine>
20 #include <boost/capy/ex/io_env.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <exception>
27 #include <new>
28 #include <span>
29 #include <stop_token>
30 #include <system_error>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any WriteStream.
37
38 This class provides type erasure for any type satisfying the
39 @ref WriteStream concept, enabling runtime polymorphism for
40 write operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the stream.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to stream must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Immediate Completion
56 Operations complete immediately without suspending when the
57 buffer sequence is empty, or when the underlying stream's
58 awaitable reports readiness via `await_ready`.
59
60 @par Thread Safety
61 Not thread-safe. Concurrent operations on the same wrapper
62 are undefined behavior.
63
64 @par Example
65 @code
66 // Owning - takes ownership of the stream
67 any_write_stream stream(socket{ioc});
68
69 // Reference - wraps without ownership
70 socket sock(ioc);
71 any_write_stream stream(&sock);
72
73 const_buffer buf(data, size);
74 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75 @endcode
76
77 @see any_read_stream, any_stream, WriteStream
78 */
79 class any_write_stream
80 {
81 struct vtable;
82
83 template<WriteStream S>
84 struct vtable_for_impl;
85
86 // ordered for cache line coherence
87 void* stream_ = nullptr;
88 vtable const* vt_ = nullptr;
89 void* cached_awaitable_ = nullptr;
90 void* storage_ = nullptr;
91 bool awaitable_active_ = false;
92
93 public:
94 /** Destructor.
95
96 Destroys the owned stream (if any) and releases the cached
97 awaitable storage.
98 */
99 ~any_write_stream();
100
101 /** Construct a default instance.
102
103 Constructs an empty wrapper. Operations on a default-constructed
104 wrapper result in undefined behavior.
105 */
106 1x any_write_stream() = default;
107
108 /** Non-copyable.
109
110 The awaitable cache is per-instance and cannot be shared.
111 */
112 any_write_stream(any_write_stream const&) = delete;
113 any_write_stream& operator=(any_write_stream const&) = delete;
114
115 /** Construct by moving.
116
117 Transfers ownership of the wrapped stream (if owned) and
118 cached awaitable storage from `other`. After the move, `other` is
119 in a default-constructed state.
120
121 @param other The wrapper to move from.
122 */
123 2x any_write_stream(any_write_stream&& other) noexcept
124 2x : stream_(std::exchange(other.stream_, nullptr))
125 2x , vt_(std::exchange(other.vt_, nullptr))
126 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 2x , storage_(std::exchange(other.storage_, nullptr))
128 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
129 {
130 2x }
131
132 /** Assign by moving.
133
134 Destroys any owned stream and releases existing resources,
135 then transfers ownership from `other`.
136
137 @param other The wrapper to move from.
138 @return Reference to this wrapper.
139 */
140 any_write_stream&
141 operator=(any_write_stream&& other) noexcept;
142
143 /** Construct by taking ownership of a WriteStream.
144
145 Allocates storage and moves the stream into this wrapper.
146 The wrapper owns the stream and will destroy it.
147
148 @param s The stream to take ownership of.
149 */
150 template<WriteStream S>
151 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152 any_write_stream(S s);
153
154 /** Construct by wrapping a WriteStream without ownership.
155
156 Wraps the given stream by pointer. The stream must remain
157 valid for the lifetime of this wrapper.
158
159 @param s Pointer to the stream to wrap.
160 */
161 template<WriteStream S>
162 any_write_stream(S* s);
163
164 /** Check if the wrapper contains a valid stream.
165
166 @return `true` if wrapping a stream, `false` if default-constructed
167 or moved-from.
168 */
169 bool
170 21x has_value() const noexcept
171 {
172 21x return stream_ != nullptr;
173 }
174
175 /** Check if the wrapper contains a valid stream.
176
177 @return `true` if wrapping a stream, `false` if default-constructed
178 or moved-from.
179 */
180 explicit
181 3x operator bool() const noexcept
182 {
183 3x return has_value();
184 }
185
186 /** Initiate an asynchronous write operation.
187
188 Writes data from the provided buffer sequence. The operation
189 completes when at least one byte has been written, or an error
190 occurs.
191
192 @param buffers The buffer sequence containing data to write.
193 Passed by value to ensure the sequence lives in the
194 coroutine frame across suspension points.
195
196 @return An awaitable yielding `(error_code,std::size_t)`.
197
198 @par Immediate Completion
199 The operation completes immediately without suspending
200 the calling coroutine when:
201 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202 @li The underlying stream's awaitable reports immediate
203 readiness via `await_ready`.
204
205 @note This is a partial operation and may not process the
206 entire buffer sequence. Use the composed @ref write algorithm
207 for guaranteed complete transfer.
208
209 @par Preconditions
210 The wrapper must contain a valid stream (`has_value() == true`).
211 */
212 template<ConstBufferSequence CB>
213 auto
214 write_some(CB buffers);
215
216 protected:
217 /** Rebind to a new stream after move.
218
219 Updates the internal pointer to reference a new stream object.
220 Used by owning wrappers after move assignment when the owned
221 object has moved to a new location.
222
223 @param new_stream The new stream to bind to. Must be the same
224 type as the original stream.
225
226 @note Terminates if called with a stream of different type
227 than the original.
228 */
229 template<WriteStream S>
230 void
231 rebind(S& new_stream) noexcept
232 {
233 if(vt_ != &vtable_for_impl<S>::value)
234 std::terminate();
235 stream_ = &new_stream;
236 }
237 };
238
239 struct any_write_stream::vtable
240 {
241 // ordered by call frequency for cache line coherence
242 void (*construct_awaitable)(
243 void* stream,
244 void* storage,
245 std::span<const_buffer const> buffers);
246 bool (*await_ready)(void*);
247 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248 io_result<std::size_t> (*await_resume)(void*);
249 void (*destroy_awaitable)(void*) noexcept;
250 std::size_t awaitable_size;
251 std::size_t awaitable_align;
252 void (*destroy)(void*) noexcept;
253 };
254
255 template<WriteStream S>
256 struct any_write_stream::vtable_for_impl
257 {
258 using Awaitable = decltype(std::declval<S&>().write_some(
259 std::span<const_buffer const>{}));
260
261 static void
262 1x do_destroy_impl(void* stream) noexcept
263 {
264 1x static_cast<S*>(stream)->~S();
265 1x }
266
267 static void
268 75x construct_awaitable_impl(
269 void* stream,
270 void* storage,
271 std::span<const_buffer const> buffers)
272 {
273 75x auto& s = *static_cast<S*>(stream);
274 75x ::new(storage) Awaitable(s.write_some(buffers));
275 75x }
276
277 static constexpr vtable value = {
278 &construct_awaitable_impl,
279 75x +[](void* p) {
280 75x return static_cast<Awaitable*>(p)->await_ready();
281 },
282 2x +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283 2x return detail::call_await_suspend(
284 2x static_cast<Awaitable*>(p), h, env);
285 },
286 73x +[](void* p) {
287 73x return static_cast<Awaitable*>(p)->await_resume();
288 },
289 77x +[](void* p) noexcept {
290 12x static_cast<Awaitable*>(p)->~Awaitable();
291 },
292 sizeof(Awaitable),
293 alignof(Awaitable),
294 &do_destroy_impl
295 };
296 };
297
298 inline
299 95x any_write_stream::~any_write_stream()
300 {
301 95x if(storage_)
302 {
303 1x vt_->destroy(stream_);
304 1x ::operator delete(storage_);
305 }
306 95x if(cached_awaitable_)
307 {
308 85x if(awaitable_active_)
309 1x vt_->destroy_awaitable(cached_awaitable_);
310 85x ::operator delete(cached_awaitable_);
311 }
312 95x }
313
314 inline any_write_stream&
315 5x any_write_stream::operator=(any_write_stream&& other) noexcept
316 {
317 5x if(this != &other)
318 {
319 5x if(storage_)
320 {
321 vt_->destroy(stream_);
322 ::operator delete(storage_);
323 }
324 5x if(cached_awaitable_)
325 {
326 2x if(awaitable_active_)
327 1x vt_->destroy_awaitable(cached_awaitable_);
328 2x ::operator delete(cached_awaitable_);
329 }
330 5x stream_ = std::exchange(other.stream_, nullptr);
331 5x vt_ = std::exchange(other.vt_, nullptr);
332 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
333 5x storage_ = std::exchange(other.storage_, nullptr);
334 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
335 }
336 5x return *this;
337 }
338
339 template<WriteStream S>
340 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
341 1x any_write_stream::any_write_stream(S s)
342 1x : vt_(&vtable_for_impl<S>::value)
343 {
344 struct guard {
345 any_write_stream* self;
346 bool committed = false;
347 1x ~guard() {
348 1x if(!committed && self->storage_) {
349 self->vt_->destroy(self->stream_);
350 ::operator delete(self->storage_);
351 self->storage_ = nullptr;
352 self->stream_ = nullptr;
353 }
354 1x }
355 1x } g{this};
356
357 1x storage_ = ::operator new(sizeof(S));
358 1x stream_ = ::new(storage_) S(std::move(s));
359
360 // Preallocate the awaitable storage
361 1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
362
363 1x g.committed = true;
364 1x }
365
366 template<WriteStream S>
367 86x any_write_stream::any_write_stream(S* s)
368 86x : stream_(s)
369 86x , vt_(&vtable_for_impl<S>::value)
370 {
371 // Preallocate the awaitable storage
372 86x cached_awaitable_ = ::operator new(vt_->awaitable_size);
373 86x }
374
375 template<ConstBufferSequence CB>
376 auto
377 79x any_write_stream::write_some(CB buffers)
378 {
379 struct awaitable
380 {
381 any_write_stream* self_;
382 const_buffer_array<detail::max_iovec_> ba_;
383
384 79x awaitable(
385 any_write_stream* self,
386 CB const& buffers) noexcept
387 79x : self_(self)
388 79x , ba_(buffers)
389 {
390 79x }
391
392 bool
393 79x await_ready() const noexcept
394 {
395 79x return ba_.to_span().empty();
396 }
397
398 std::coroutine_handle<>
399 75x await_suspend(std::coroutine_handle<> h, io_env const* env)
400 {
401 75x self_->vt_->construct_awaitable(
402 75x self_->stream_,
403 75x self_->cached_awaitable_,
404 75x ba_.to_span());
405 75x self_->awaitable_active_ = true;
406
407 75x if(self_->vt_->await_ready(self_->cached_awaitable_))
408 73x return h;
409
410 2x return self_->vt_->await_suspend(
411 2x self_->cached_awaitable_, h, env);
412 }
413
414 io_result<std::size_t>
415 77x await_resume()
416 {
417 77x if(!self_->awaitable_active_)
418 4x return {{}, 0};
419 struct guard {
420 any_write_stream* self;
421 73x ~guard() {
422 73x self->vt_->destroy_awaitable(self->cached_awaitable_);
423 73x self->awaitable_active_ = false;
424 73x }
425 73x } g{self_};
426 73x return self_->vt_->await_resume(
427 73x self_->cached_awaitable_);
428 73x }
429 };
430 79x return awaitable{this, buffers};
431 }
432
433 } // namespace capy
434 } // namespace boost
435
436 #endif
437