1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8 -
// Official repository: https://github.com/cppalliance/capy
8 +
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
11  
#ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12  
#define BOOST_CAPY_EX_THREAD_POOL_HPP
12  
#define BOOST_CAPY_EX_THREAD_POOL_HPP
13  

13  

14  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/config.hpp>
15  
#include <coroutine>
15  
#include <coroutine>
16  
#include <boost/capy/ex/execution_context.hpp>
16  
#include <boost/capy/ex/execution_context.hpp>
17  
#include <cstddef>
17  
#include <cstddef>
18  
#include <string_view>
18  
#include <string_view>
19  

19  

20  
namespace boost {
20  
namespace boost {
21  
namespace capy {
21  
namespace capy {
22  

22  

23  
/** A pool of threads for executing work concurrently.
23  
/** A pool of threads for executing work concurrently.
24  

24  

25  
    Use this when you need to run coroutines on multiple threads
25  
    Use this when you need to run coroutines on multiple threads
26  
    without the overhead of creating and destroying threads for
26  
    without the overhead of creating and destroying threads for
27  
    each task. Work items are distributed across the pool using
27  
    each task. Work items are distributed across the pool using
28  
    a shared queue.
28  
    a shared queue.
29  

29  

30  
    @par Thread Safety
30  
    @par Thread Safety
31  
    Distinct objects: Safe.
31  
    Distinct objects: Safe.
32  
    Shared objects: Unsafe.
32  
    Shared objects: Unsafe.
33  

33  

34  
    @par Example
34  
    @par Example
35  
    @code
35  
    @code
36  
    thread_pool pool(4);  // 4 worker threads
36  
    thread_pool pool(4);  // 4 worker threads
37  
    auto ex = pool.get_executor();
37  
    auto ex = pool.get_executor();
38  
    ex.post(some_coroutine);
38  
    ex.post(some_coroutine);
39  
    // pool destructor waits for all work to complete
39  
    // pool destructor waits for all work to complete
40  
    @endcode
40  
    @endcode
41  
*/
41  
*/
42  
class BOOST_CAPY_DECL
42  
class BOOST_CAPY_DECL
43  
    thread_pool
43  
    thread_pool
44  
    : public execution_context
44  
    : public execution_context
45  
{
45  
{
46  
    class impl;
46  
    class impl;
47  
    impl* impl_;
47  
    impl* impl_;
48  

48  

49  
public:
49  
public:
50  
    class executor_type;
50  
    class executor_type;
51  

51  

52  
    /** Destroy the thread pool.
52  
    /** Destroy the thread pool.
53  

53  

54  
        Signals all worker threads to stop, waits for them to
54  
        Signals all worker threads to stop, waits for them to
55  
        finish, and destroys any pending work items.
55  
        finish, and destroys any pending work items.
56  
    */
56  
    */
57  
    ~thread_pool();
57  
    ~thread_pool();
58  

58  

59  
    /** Construct a thread pool.
59  
    /** Construct a thread pool.
60  

60  

61  
        Creates a pool with the specified number of worker threads.
61  
        Creates a pool with the specified number of worker threads.
62  
        If `num_threads` is zero, the number of threads is set to
62  
        If `num_threads` is zero, the number of threads is set to
63  
        the hardware concurrency, or one if that cannot be determined.
63  
        the hardware concurrency, or one if that cannot be determined.
64  

64  

65  
        @param num_threads The number of worker threads, or zero
65  
        @param num_threads The number of worker threads, or zero
66  
            for automatic selection.
66  
            for automatic selection.
67  

67  

68  
        @param thread_name_prefix The prefix for worker thread names.
68  
        @param thread_name_prefix The prefix for worker thread names.
69  
            Thread names appear as "{prefix}0", "{prefix}1", etc.
69  
            Thread names appear as "{prefix}0", "{prefix}1", etc.
70  
            The prefix is truncated to 12 characters. Defaults to
70  
            The prefix is truncated to 12 characters. Defaults to
71  
            "capy-pool-".
71  
            "capy-pool-".
72  
    */
72  
    */
73  
    explicit
73  
    explicit
74  
    thread_pool(
74  
    thread_pool(
75  
        std::size_t num_threads = 0,
75  
        std::size_t num_threads = 0,
76  
        std::string_view thread_name_prefix = "capy-pool-");
76  
        std::string_view thread_name_prefix = "capy-pool-");
77  

77  

78  
    thread_pool(thread_pool const&) = delete;
78  
    thread_pool(thread_pool const&) = delete;
79  
    thread_pool& operator=(thread_pool const&) = delete;
79  
    thread_pool& operator=(thread_pool const&) = delete;
80  

80  

81  
    /** Wait for all outstanding work to complete.
81  
    /** Wait for all outstanding work to complete.
82  

82  

83  
        Releases the internal work guard, then blocks the calling
83  
        Releases the internal work guard, then blocks the calling
84  
        thread until all outstanding work tracked by
84  
        thread until all outstanding work tracked by
85  
        @ref executor_type::on_work_started and
85  
        @ref executor_type::on_work_started and
86  
        @ref executor_type::on_work_finished completes. After all
86  
        @ref executor_type::on_work_finished completes. After all
87  
        work finishes, joins the worker threads.
87  
        work finishes, joins the worker threads.
88  

88  

89  
        If @ref stop is called while `join()` is blocking, the
89  
        If @ref stop is called while `join()` is blocking, the
90  
        pool stops without waiting for remaining work to
90  
        pool stops without waiting for remaining work to
91  
        complete. Worker threads finish their current item and
91  
        complete. Worker threads finish their current item and
92  
        exit; `join()` still waits for all threads to be joined
92  
        exit; `join()` still waits for all threads to be joined
93  
        before returning.
93  
        before returning.
94  

94  

95  
        This function is idempotent. The first call performs the
95  
        This function is idempotent. The first call performs the
96  
        join; subsequent calls return immediately.
96  
        join; subsequent calls return immediately.
97  

97  

98  
        @par Preconditions
98  
        @par Preconditions
99  
        Must not be called from a thread in this pool (undefined
99  
        Must not be called from a thread in this pool (undefined
100  
        behavior).
100  
        behavior).
101  

101  

102  
        @par Postconditions
102  
        @par Postconditions
103  
        All worker threads have been joined. The pool cannot be
103  
        All worker threads have been joined. The pool cannot be
104  
        reused.
104  
        reused.
105  

105  

106  
        @par Thread Safety
106  
        @par Thread Safety
107  
        May be called from any thread not in this pool.
107  
        May be called from any thread not in this pool.
108  
    */
108  
    */
109  
    void
109  
    void
110  
    join() noexcept;
110  
    join() noexcept;
111  

111  

112  
    /** Request all worker threads to stop.
112  
    /** Request all worker threads to stop.
113  

113  

114  
        Signals all threads to exit after finishing their current
114  
        Signals all threads to exit after finishing their current
115  
        work item. Queued work that has not started is abandoned.
115  
        work item. Queued work that has not started is abandoned.
116  
        Does not wait for threads to exit.
116  
        Does not wait for threads to exit.
117  

117  

118  
        If @ref join is blocking on another thread, calling
118  
        If @ref join is blocking on another thread, calling
119  
        `stop()` causes it to stop waiting for outstanding
119  
        `stop()` causes it to stop waiting for outstanding
120  
        work. The `join()` call still waits for worker threads
120  
        work. The `join()` call still waits for worker threads
121  
        to finish their current item and exit before returning.
121  
        to finish their current item and exit before returning.
122  
    */
122  
    */
123  
    void
123  
    void
124  
    stop() noexcept;
124  
    stop() noexcept;
125  

125  

126  
    /** Return an executor for this thread pool.
126  
    /** Return an executor for this thread pool.
127  

127  

128  
        @return An executor associated with this thread pool.
128  
        @return An executor associated with this thread pool.
129  
    */
129  
    */
130  
    executor_type
130  
    executor_type
131  
    get_executor() const noexcept;
131  
    get_executor() const noexcept;
132  
};
132  
};
133  

133  

134  
/** An executor that submits work to a thread_pool.
134  
/** An executor that submits work to a thread_pool.
135  

135  

136  
    Executors are lightweight handles that can be copied and stored.
136  
    Executors are lightweight handles that can be copied and stored.
137  
    All copies refer to the same underlying thread pool.
137  
    All copies refer to the same underlying thread pool.
138  

138  

139  
    @par Thread Safety
139  
    @par Thread Safety
140  
    Distinct objects: Safe.
140  
    Distinct objects: Safe.
141  
    Shared objects: Safe.
141  
    Shared objects: Safe.
142  
*/
142  
*/
143  
class thread_pool::executor_type
143  
class thread_pool::executor_type
144  
{
144  
{
145  
    friend class thread_pool;
145  
    friend class thread_pool;
146  

146  

147  
    thread_pool* pool_ = nullptr;
147  
    thread_pool* pool_ = nullptr;
148  

148  

149  
    explicit
149  
    explicit
150  
    executor_type(thread_pool& pool) noexcept
150  
    executor_type(thread_pool& pool) noexcept
151  
        : pool_(&pool)
151  
        : pool_(&pool)
152  
    {
152  
    {
153  
    }
153  
    }
154  

154  

155  
public:
155  
public:
156  
    /** Construct a default null executor.
156  
    /** Construct a default null executor.
157  

157  

158  
        The resulting executor is not associated with any pool.
158  
        The resulting executor is not associated with any pool.
159  
        `context()`, `dispatch()`, and `post()` require the
159  
        `context()`, `dispatch()`, and `post()` require the
160  
        executor to be associated with a pool before use.
160  
        executor to be associated with a pool before use.
161  
    */
161  
    */
162  
    executor_type() = default;
162  
    executor_type() = default;
163  

163  

164  
    /// Return the underlying thread pool.
164  
    /// Return the underlying thread pool.
165  
    thread_pool&
165  
    thread_pool&
166  
    context() const noexcept
166  
    context() const noexcept
167  
    {
167  
    {
168  
        return *pool_;
168  
        return *pool_;
169  
    }
169  
    }
170  

170  

171  
    /** Notify that work has started.
171  
    /** Notify that work has started.
172  

172  

173  
        Increments the outstanding work count. Must be paired
173  
        Increments the outstanding work count. Must be paired
174  
        with a subsequent call to @ref on_work_finished.
174  
        with a subsequent call to @ref on_work_finished.
175  

175  

176  
        @see on_work_finished, work_guard
176  
        @see on_work_finished, work_guard
177  
    */
177  
    */
178  
    BOOST_CAPY_DECL
178  
    BOOST_CAPY_DECL
179  
    void
179  
    void
180  
    on_work_started() const noexcept;
180  
    on_work_started() const noexcept;
181  

181  

182  
    /** Notify that work has finished.
182  
    /** Notify that work has finished.
183  

183  

184  
        Decrements the outstanding work count. When the count
184  
        Decrements the outstanding work count. When the count
185  
        reaches zero after @ref thread_pool::join has been called,
185  
        reaches zero after @ref thread_pool::join has been called,
186  
        the pool's worker threads are signaled to stop.
186  
        the pool's worker threads are signaled to stop.
187  

187  

188  
        @pre A preceding call to @ref on_work_started was made.
188  
        @pre A preceding call to @ref on_work_started was made.
189  

189  

190  
        @see on_work_started, work_guard
190  
        @see on_work_started, work_guard
191  
    */
191  
    */
192  
    BOOST_CAPY_DECL
192  
    BOOST_CAPY_DECL
193  
    void
193  
    void
194  
    on_work_finished() const noexcept;
194  
    on_work_finished() const noexcept;
195  

195  

196  
    /** Dispatch a coroutine for execution.
196  
    /** Dispatch a coroutine for execution.
197  

197  

198  
        Posts the coroutine to the thread pool for execution on a
198  
        Posts the coroutine to the thread pool for execution on a
199  
        worker thread and returns `std::noop_coroutine()`. Thread
199  
        worker thread and returns `std::noop_coroutine()`. Thread
200  
        pools never execute inline because no single thread "owns"
200  
        pools never execute inline because no single thread "owns"
201  
        the pool.
201  
        the pool.
202  

202  

203  
        @param h The coroutine handle to execute.
203  
        @param h The coroutine handle to execute.
204  

204  

205  
        @return `std::noop_coroutine()` always.
205  
        @return `std::noop_coroutine()` always.
206  
    */
206  
    */
207  
    std::coroutine_handle<>
207  
    std::coroutine_handle<>
208  
    dispatch(std::coroutine_handle<> h) const
208  
    dispatch(std::coroutine_handle<> h) const
209  
    {
209  
    {
210  
        post(h);
210  
        post(h);
211  
        return std::noop_coroutine();
211  
        return std::noop_coroutine();
212  
    }
212  
    }
213  

213  

214  
    /** Post a coroutine to the thread pool.
214  
    /** Post a coroutine to the thread pool.
215  

215  

216  
        The coroutine will be resumed on one of the pool's
216  
        The coroutine will be resumed on one of the pool's
217  
        worker threads.
217  
        worker threads.
218  

218  

219  
        @param h The coroutine handle to execute.
219  
        @param h The coroutine handle to execute.
220  
    */
220  
    */
221  
    BOOST_CAPY_DECL
221  
    BOOST_CAPY_DECL
222  
    void
222  
    void
223  
    post(std::coroutine_handle<> h) const;
223  
    post(std::coroutine_handle<> h) const;
224  

224  

225  
    /// Return true if two executors refer to the same thread pool.
225  
    /// Return true if two executors refer to the same thread pool.
226  
    bool
226  
    bool
227  
    operator==(executor_type const& other) const noexcept
227  
    operator==(executor_type const& other) const noexcept
228  
    {
228  
    {
229  
        return pool_ == other.pool_;
229  
        return pool_ == other.pool_;
230  
    }
230  
    }
231  
};
231  
};
232  

232  

233  
} // capy
233  
} // capy
234  
} // boost
234  
} // boost
235  

235  

236  
#endif
236  
#endif